[GitHub] flink pull request #2673: [FLINK-4864] [table] Shade Calcite dependency in f...

2016-10-20 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2673

[FLINK-4864] [table] Shade Calcite dependency in flink-table

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


This can solve version conflicts when users have a own Calcite dependency 
in the classpath.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink shade-calcite-FLINK-4864

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2673


commit 61d6c80789cce00f47c04df1e97ed0ea016fcbb3
Author: Jark Wu 
Date:   2016-10-21T05:50:21Z

[FLINK-4864] [table] Shade Calcite dependency in flink-table




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4864) Shade Calcite dependency in flink-table

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594176#comment-15594176
 ] 

ASF GitHub Bot commented on FLINK-4864:
---

GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2673

[FLINK-4864] [table] Shade Calcite dependency in flink-table

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


This can solve version conflicts when users have a own Calcite dependency 
in the classpath.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink shade-calcite-FLINK-4864

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2673.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2673


commit 61d6c80789cce00f47c04df1e97ed0ea016fcbb3
Author: Jark Wu 
Date:   2016-10-21T05:50:21Z

[FLINK-4864] [table] Shade Calcite dependency in flink-table




> Shade Calcite dependency in flink-table
> ---
>
> Key: FLINK-4864
> URL: https://issues.apache.org/jira/browse/FLINK-4864
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The Table API has a dependency on Apache Calcite.
> A user reported to have version conflicts when having a own Calcite 
> dependency in the classpath.
> The solution would be to shade away the Calcite dependency (Calcite's 
> transitive dependencies are already shaded).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594169#comment-15594169
 ] 

ASF GitHub Bot commented on FLINK-4866:
---

GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2672

[FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce 
Implementation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR makes Trigger.clear() method to be abstract, so that implementors 
of custom triggers will not forget to clean up their state/timers properly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink trigger-FLINK-4866

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2672.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2672


commit 313b2a6f1871357b3281e623423f49ef84ad59a1
Author: Jark Wu 
Date:   2016-10-21T05:46:45Z

[FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce 
Implementation




> Make Trigger.clear() Abstract to Enforce Implementation
> ---
>
> Key: FLINK-4866
> URL: https://issues.apache.org/jira/browse/FLINK-4866
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Jark Wu
>
> If the method is not abstract implementors of custom triggers will not 
> realise that it could be necessary and they will likely not clean up their 
> state/timers properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2672: [FLINK-4866] [streaming] Make Trigger.clear() Abst...

2016-10-20 Thread wuchong
GitHub user wuchong opened a pull request:

https://github.com/apache/flink/pull/2672

[FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce 
Implementation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

This PR makes Trigger.clear() method to be abstract, so that implementors 
of custom triggers will not forget to clean up their state/timers properly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wuchong/flink trigger-FLINK-4866

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2672.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2672


commit 313b2a6f1871357b3281e623423f49ef84ad59a1
Author: Jark Wu 
Date:   2016-10-21T05:46:45Z

[FLINK-4866] [streaming] Make Trigger.clear() Abstract to Enforce 
Implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4864) Shade Calcite dependency in flink-table

2016-10-20 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-4864:
--

Assignee: Jark Wu

> Shade Calcite dependency in flink-table
> ---
>
> Key: FLINK-4864
> URL: https://issues.apache.org/jira/browse/FLINK-4864
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>
> The Table API has a dependency on Apache Calcite.
> A user reported to have version conflicts when having a own Calcite 
> dependency in the classpath.
> The solution would be to shade away the Calcite dependency (Calcite's 
> transitive dependencies are already shaded).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation

2016-10-20 Thread Jark Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-4866:
--

Assignee: Jark Wu

> Make Trigger.clear() Abstract to Enforce Implementation
> ---
>
> Key: FLINK-4866
> URL: https://issues.apache.org/jira/browse/FLINK-4866
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Jark Wu
>
> If the method is not abstract implementors of custom triggers will not 
> realise that it could be necessary and they will likely not clean up their 
> state/timers properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4862) NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15594036#comment-15594036
 ] 

ASF GitHub Bot commented on FLINK-4862:
---

GitHub user manuzhang opened a pull request:

https://github.com/apache/flink/pull/2671

[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/flink fix_merge_window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2671.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2671


commit b2370946357044250511e25fce5078812ad22c82
Author: manuzhang 
Date:   2016-10-20T07:06:01Z

[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger




> NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger
> --
>
> Key: FLINK-4862
> URL: https://issues.apache.org/jira/browse/FLINK-4862
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Windowing Operators
>Reporter: Manu Zhang
>
> h3. what's the error ?
> The following NPE error is thrown when EventTimeSessionWindows with 
> ContinuousEventTimeTrigger is used.
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.clear(ContinuousEventTimeTrigger.java:91)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:768)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:310)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:297)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:196)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:297)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:271)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> h3. how to reproduce ?
> use {{ContinuousEventTimeTrigger}} instead of the default 
> {{EventTimeTrigger}} in [SessionWindowing | 
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java#L84]
>  example. 
> h3. what's the cause ?
> When two session windows are being merged, the states of the two 
> {{ContinuousEventTimeTrigger}} are merged as well and the new namespace is 
> the merged window. Later when the context tries to delete {{Timer}} from the 
> old trigger and looks up the timestamp by the old namespace, null value is 
> returned. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2671: [FLINK-4862] fix Timer register in ContinuousEvent...

2016-10-20 Thread manuzhang
GitHub user manuzhang opened a pull request:

https://github.com/apache/flink/pull/2671

[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/flink fix_merge_window

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2671.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2671


commit b2370946357044250511e25fce5078812ad22c82
Author: manuzhang 
Date:   2016-10-20T07:06:01Z

[FLINK-4862] fix Timer register in ContinuousEventTimeTrigger




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge

2016-10-20 Thread Manu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manu Zhang closed FLINK-4863.
-
Resolution: Duplicate

> states of merging window and trigger are set to different TimeWindows on merge
> --
>
> Key: FLINK-4863
> URL: https://issues.apache.org/jira/browse/FLINK-4863
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Windowing Operators
>Reporter: Manu Zhang
>
> While window state is set to the mergeResult's stateWindow (one of original 
> windows), trigger state is set to the mergeResult itself. This will fail 
> {{Timer}} of  {{ContinuousEventTimeTrigger}} since its window cannot be found 
> in the window state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge

2016-10-20 Thread Manu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593811#comment-15593811
 ] 

Manu Zhang commented on FLINK-4863:
---

Found this is related to FLINK-4862. {{ContinuousEventTimeTrigger}}'s original 
state ({{fireTimestamp}}) is already cleared in 
{{context.onMerge(mergeWindows)}} so that the original {{Timer}} cannot be 
removed by the {{fireTimestamp}} in {{context.clear()}} later. I will close 
this and try fixing in FLINK-4862.

> states of merging window and trigger are set to different TimeWindows on merge
> --
>
> Key: FLINK-4863
> URL: https://issues.apache.org/jira/browse/FLINK-4863
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Windowing Operators
>Reporter: Manu Zhang
>
> While window state is set to the mergeResult's stateWindow (one of original 
> windows), trigger state is set to the mergeResult itself. This will fail 
> {{Timer}} of  {{ContinuousEventTimeTrigger}} since its window cannot be found 
> in the window state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4863) states of merging window and trigger are set to different TimeWindows on merge

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593797#comment-15593797
 ] 

ASF GitHub Bot commented on FLINK-4863:
---

Github user manuzhang closed the pull request at:

https://github.com/apache/flink/pull/2666


> states of merging window and trigger are set to different TimeWindows on merge
> --
>
> Key: FLINK-4863
> URL: https://issues.apache.org/jira/browse/FLINK-4863
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Windowing Operators
>Reporter: Manu Zhang
>
> While window state is set to the mergeResult's stateWindow (one of original 
> windows), trigger state is set to the mergeResult itself. This will fail 
> {{Timer}} of  {{ContinuousEventTimeTrigger}} since its window cannot be found 
> in the window state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2666: [FLINK-4863] fix trigger context window on merge

2016-10-20 Thread manuzhang
Github user manuzhang closed the pull request at:

https://github.com/apache/flink/pull/2666


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2617: [FLINK-4705] Instrument FixedLengthRecordSorter

2016-10-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2617
  
I have started like to take a look at this...

The `FixLegthRecordSorter` and its interactions with the serializers / 
comparators was not very well tested before, hence not activated. I am trying 
to double-check that...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.

2016-10-20 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2618
  
Thanks for the comments @mxm ! I integrated them and waiting for Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4204) Clean up gelly-examples

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15593069#comment-15593069
 ] 

ASF GitHub Bot commented on FLINK-4204:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2670

[FLINK-4204] [gelly] Clean up gelly-examples

Moves drivers into separate package. Adds default main class to print usage 
listing included classes. Includes documentation for running Gelly examples.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 4204_clean_up_gelly_examples

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2670.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2670


commit 642267c70f362ce5414838aaddbed0dcd6b60934
Author: Greg Hogan 
Date:   2016-08-24T15:32:43Z

[FLINK-4204] [gelly] Clean up gelly-examples

Moves drivers into separate package. Adds default main class to print
usage listing included classes. Includes documentation for running
Gelly examples.




> Clean up gelly-examples
> ---
>
> Key: FLINK-4204
> URL: https://issues.apache.org/jira/browse/FLINK-4204
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Vasia Kalavri
>Assignee: Greg Hogan
>
> The gelly-examples has grown quite big (14 examples) and contains several 
> examples that illustrate the same functionality. Examples should help users 
> understand how to use the API and ideally show how to use 1-2 features.
> Also, it is helpful to state the purpose of each example in the comments.
> We should keep the example set small and move everything that does not fit 
> there to the library.
> I propose to remove the following:
> - ClusteringCoefficient: the functionality already exists as a library method.
> - HITS: the functionality already exists as a library method.
> - JaccardIndex: the functionality already exists as a library method.
> - SingleSourceShortestPaths: the example shows how to use scatter-gather 
> iterations. HITSAlgorithm shows the same feature plus the use of aggregators. 
> I propose we keep this one instead.
> - TriangleListing: the functionality already exists as a library method



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2670: [FLINK-4204] [gelly] Clean up gelly-examples

2016-10-20 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/2670

[FLINK-4204] [gelly] Clean up gelly-examples

Moves drivers into separate package. Adds default main class to print usage 
listing included classes. Includes documentation for running Gelly examples.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 4204_clean_up_gelly_examples

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2670.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2670


commit 642267c70f362ce5414838aaddbed0dcd6b60934
Author: Greg Hogan 
Date:   2016-08-24T15:32:43Z

[FLINK-4204] [gelly] Clean up gelly-examples

Moves drivers into separate package. Adds default main class to print
usage listing included classes. Includes documentation for running
Gelly examples.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2506: [FLINK-4581] [table] Table API throws "No suitable driver...

2016-10-20 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2506
  
+1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2669: [FLINK-4871] [mini cluster] Add memory calculation...

2016-10-20 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2669

[FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to 
MiniCluster

This PR is based on #2651, #2655 and #2657.

If the managed memory size for the task manager has not been set in the 
Configuration, then
it is automatically calculated by dividing the available memory by the 
number of distributed
components. Additionally this PR allows to provide a MetricRegistry to the 
TaskManagerRunner.
That way it is possible to use the MiniCluster's MetricRegistry.

Add memory calculation for task managers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink miniClusterUpdate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2669


commit 6487af737b57ca16190c0f4a6b63d4afd2af2b06
Author: Till Rohrmann 
Date:   2016-10-17T14:22:16Z

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

commit cf7661aafb71649360ad6159f27a82433bfd8f75
Author: Till Rohrmann 
Date:   2016-10-17T14:03:02Z

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. 
The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds 
the MetricRegistry to the RM which can be used
to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the TestingFatalErrorHandler into the util package of 
flink-runtime test. That
it is usable across multiple tests.

Introduce ResourceManagerRunner to handle errors in the ResourceManager

commit 61d328adb637c44889aa724c270c39657d1289c2
Author: Till Rohrmann 
Date:   2016-10-18T16:03:00Z

[FLINK-4853] [rm] Clean up job manager registration at the resource manager

Introduce the JobLeaderIdService which automatically retrieves the current 
job leader id.
This job leader id is used to validate job manager registartion attempts. 
Additionally, it
is used to disconnect old job leaders from the resource manager.

Add comments

commit 58e8d6c06b55456793d2dffbee309e491d21d309
Author: Till Rohrmann 
Date:   2016-10-20T09:07:08Z

[FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to 
MiniCluster

If the managed memory size for the task manager has not been set in the 
Configuration, then
it is automatically calculated by dividing the available memory by the 
number of distributed
components. Additionally this PR allows to provide a MetricRegistry to the 
TaskManagerRunner.
That way it is possible to use the MiniCluster's MetricRegistry.

Add memory calculation for task managers




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4581) Table API throws "No suitable driver found for jdbc:calcite"

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592877#comment-15592877
 ] 

ASF GitHub Bot commented on FLINK-4581:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2506
  
+1 to merge


> Table API throws "No suitable driver found for jdbc:calcite"
> 
>
> Key: FLINK-4581
> URL: https://issues.apache.org/jira/browse/FLINK-4581
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> It seems that in certain cases the internal Calcite JDBC driver cannot be 
> found. We should either try to get rid of the entire JDBC invocation or fix 
> this bug.
> From ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stream-sql-query-in-Flink-tp8928.html
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>   at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>   at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>   at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.RuntimeException: java.sql.SQLException: No suitable
> driver found for jdbc:calcite:
>   at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:151)
>   at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:106)
>   at org.apache.calcite.tools.Frameworks.withPlanner(Frameworks.java:127)
>   at
> org.apache.flink.api.table.FlinkRelBuilder$.create(FlinkRelBuilder.scala:56)
>   at
> org.apache.flink.api.table.TableEnvironment.(TableEnvironment.scala:73)
>   at
> org.apache.flink.api.table.StreamTableEnvironment.(StreamTableEnvironment.scala:58)
>   at
> org.apache.flink.api.java.table.StreamTableEnvironment.(StreamTableEnvironment.scala:45)
>   at
> org.apache.flink.api.table.TableEnvironment$.getTableEnvironment(TableEnvironment.scala:376)
>   at
> org.apache.flink.api.table.TableEnvironment.getTableEnvironment(TableEnvironment.scala)
>   at 
> org.myorg.quickstart.ReadingFromKafka2.main(ReadingFromKafka2.java:48)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>   ... 6 more
> Caused by: java.sql.SQLException: No suitable driver found for jdbc:calcite:
>   at java.sql.DriverManager.getConnection(DriverManager.java:689)
>   at java.sql.DriverManager.getConnection(DriverManager.java:208)
>   at org.apache.calcite.tools.Frameworks.withPrepare(Frameworks.java:144)
>   ... 20 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592866#comment-15592866
 ] 

Stefan Richter commented on FLINK-4867:
---

I have already pushed my code into https://github.com/StefanRRichter/RadixSort 
so that you can take a look. I will do some cleanup, more documentation, and 
tests if I find some more time to polish this. If you have questions about the 
code, just drop me an email.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4705) Instrument FixedLengthRecordSorter

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592853#comment-15592853
 ] 

ASF GitHub Bot commented on FLINK-4705:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2617
  
I have started like to take a look at this...

The `FixLegthRecordSorter` and its interactions with the serializers / 
comparators was not very well tested before, hence not activated. I am trying 
to double-check that...



> Instrument FixedLengthRecordSorter
> --
>
> Key: FLINK-4705
> URL: https://issues.apache.org/jira/browse/FLINK-4705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> The {{NormalizedKeySorter}} sorts on the concatenation of (potentially 
> partial) keys plus an 8-byte pointer to the record. After sorting each 
> pointer must be dereferenced, which is not cache friendly.
> The {{FixedLengthRecordSorter}} sorts on the concatentation of full keys 
> followed by the remainder of the record. The records can then be deserialized 
> in sequence.
> Instrumenting the {{FixedLengthRecordSorter}} requires implementing the 
> comparator methods {{writereadWithKeyNormalization}} and 
> {{readWithKeyNormalization}}.
> Testing {{JaccardIndex}} on an m4.16xlarge the scale 18 runtime dropped from 
> 71.8 to 68.8 s (4.3% faster) and the scale 20 runtime dropped from 546.1 to 
> 501.8 s (8.8% faster).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4873) Add config option to specify "home directory" for YARN client resource sharing

2016-10-20 Thread Gyula Fora (JIRA)
Gyula Fora created FLINK-4873:
-

 Summary: Add config option to specify "home directory" for YARN 
client resource sharing
 Key: FLINK-4873
 URL: https://issues.apache.org/jira/browse/FLINK-4873
 Project: Flink
  Issue Type: Improvement
  Components: YARN Client
Affects Versions: 1.2.0, 1.1.3
Reporter: Gyula Fora


The YARN client currently uses FileSystem.getHomeDirectory() to store the jar 
files that needed to be shared on the cluster. This pretty much forces users to 
run HDFS or something compatible with the Hadoop FS api on the cluster.

In some production environments file systems (distributed or simply shared) are 
simply mounted under the same path and do not require the use of the hadoop api 
for convenience. If we want to run Flink on YARN in these cases we would need 
to be able to define the "home directory" where Flink should copy the files for 
sharing.

It could be something like:
yarn.resource.upload.dir in the flink-conf.yaml



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2659: [FLINK-4857] Remove throws clause from ZooKeeperUt...

2016-10-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2659


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

2016-10-20 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2570
  
Rebased version looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...

2016-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2657
  
Sorry @mxm. The compilation problem were introduced due to an incomplete 
rebasing. Should be fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2491) Operators are not participating in state checkpointing in some cases

2016-10-20 Thread Eryn Dahlstedt (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592479#comment-15592479
 ] 

Eryn Dahlstedt commented on FLINK-2491:
---

We have been running into this problem as well. Do you have an example of this 
work around? 

Thanks for any help on this. 

> Operators are not participating in state checkpointing in some cases
> 
>
> Key: FLINK-2491
> URL: https://issues.apache.org/jira/browse/FLINK-2491
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10.0
>Reporter: Robert Metzger
>Assignee: Márton Balassi
>Priority: Critical
> Fix For: 1.0.0
>
>
> While implementing a test case for the Kafka Consumer, I came across the 
> following bug:
> Consider the following topology, with the operator parallelism in parentheses:
> Source (2) --> Sink (1).
> In this setup, the {{snapshotState()}} method is called on the source, but 
> not on the Sink.
> The sink receives the generated data.
> The only one of the two sources is generating data.
> I've implemented a test case for this, you can find it here: 
> https://github.com/rmetzger/flink/blob/para_checkpoint_bug/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ParallelismChangeCheckpoinedITCase.java



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4857) ZooKeeperUtils have a throws exception clause without throwing exceptions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592476#comment-15592476
 ] 

ASF GitHub Bot commented on FLINK-4857:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2659


> ZooKeeperUtils have a throws exception clause without throwing exceptions
> -
>
> Key: FLINK-4857
> URL: https://issues.apache.org/jira/browse/FLINK-4857
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Flink's {{ZooKeeperUtils}} contains functions which have a throws clause even 
> though they don't throw an {{Exception}}. This is wrong and should be fixed 
> by removing the throw clauses.
> Changing the ZooKeeperUtils will help to properly implement the 
> {{HighAvailabilityServices}} in the flip-6 branch, because the high 
> availability service methods don't have to throw exceptions then.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4857) ZooKeeperUtils have a throws exception clause without throwing exceptions

2016-10-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4857.

Resolution: Fixed

Fixed via 6f0faf9bb35e7cac3a38ed792cdabd6400fc4c79

> ZooKeeperUtils have a throws exception clause without throwing exceptions
> -
>
> Key: FLINK-4857
> URL: https://issues.apache.org/jira/browse/FLINK-4857
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Flink's {{ZooKeeperUtils}} contains functions which have a throws clause even 
> though they don't throw an {{Exception}}. This is wrong and should be fixed 
> by removing the throw clauses.
> Changing the ZooKeeperUtils will help to properly implement the 
> {{HighAvailabilityServices}} in the flip-6 branch, because the high 
> availability service methods don't have to throw exceptions then.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2659: [FLINK-4857] Remove throws clause from ZooKeeperUtils fun...

2016-10-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2659
  
Thanks for the review @mxm. Will merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2628: [FLINK-3722] [runtime] Don't / and % when sorting

2016-10-20 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2628
  
> I transcribed Quicksort so as to remove considerations of Java 
performance and inlining. It was not clear to me that if we encapsulated the 
index, page number, and page offset into an object that Java would inline the 
various increment and decrement functions. Also, I don't think this looks too 
bad. I'm happy to reformat if that is preferred.

OK, I would say that it is OK like this, but let's see what the others will 
say.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4857) ZooKeeperUtils have a throws exception clause without throwing exceptions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592452#comment-15592452
 ] 

ASF GitHub Bot commented on FLINK-4857:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2659
  
Thanks for the review @mxm. Will merge this PR.


> ZooKeeperUtils have a throws exception clause without throwing exceptions
> -
>
> Key: FLINK-4857
> URL: https://issues.apache.org/jira/browse/FLINK-4857
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Flink's {{ZooKeeperUtils}} contains functions which have a throws clause even 
> though they don't throw an {{Exception}}. This is wrong and should be fixed 
> by removing the throw clauses.
> Changing the ZooKeeperUtils will help to properly implement the 
> {{HighAvailabilityServices}} in the flip-6 branch, because the high 
> availability service methods don't have to throw exceptions then.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4872) Type erasure problem exclusively on cluster execution

2016-10-20 Thread Martin Junghanns (JIRA)
Martin Junghanns created FLINK-4872:
---

 Summary: Type erasure problem exclusively on cluster execution
 Key: FLINK-4872
 URL: https://issues.apache.org/jira/browse/FLINK-4872
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.1.2
Reporter: Martin Junghanns


The following codes runs fine on local and collection execution environment but 
fails when executed on a cluster.

{code:title=Problem.java}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;

import java.lang.reflect.Array;

public class Problem {

  public static class Pojo {
  }

  public static class Foo extends Tuple1 {
  }

  public static class Bar extends Tuple1 {
  }

  public static class UDF implements MapFunction {

private final Class clazz;

public UDF(Class clazz) {
  this.clazz = clazz;
}

@Override
public Bar map(Foo value) throws Exception {
  Bar bar = new Bar<>();
  //noinspection unchecked
  bar.f0 = (T[]) Array.newInstance(clazz, 10);
  return bar;
}
  }

  public static void main(String[] args) throws Exception {
// runs in local, collection and cluster execution
withLong();
// runs in local and collection execution, fails on cluster execution
withPojo();
  }

  public static void withLong() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Foo foo = new Foo<>();
foo.f0 = 42L;
DataSet barDataSource = env.fromElements(foo);
DataSet map = barDataSource.map(new UDF<>(Long.class));

map.print();
  }

  public static void withPojo() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Foo foo = new Foo<>();
foo.f0 = new Pojo();
DataSet barDataSource = env.fromElements(foo);
DataSet map = barDataSource.map(new UDF<>(Pojo.class));

map.print();
  }
}
{code}

{code:title=ProblemTest.java}
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class ProblemTest extends MultipleProgramsTestBase {

  public ProblemTest(TestExecutionMode mode) {
super(mode);
  }

  @Test
  public void testWithLong() throws Exception {
Problem.withLong();
  }

  @Test
  public void testWithPOJO() throws Exception {
Problem.withPojo();
  }
}
{code}

Exception:
{code}
The return type of function 'withPojo(Problem.java:58)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
Problem.withPojo(Problem.java:60)
Problem.main(Problem.java:38) 
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592447#comment-15592447
 ] 

ASF GitHub Bot commented on FLINK-4853:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2657
  
Sorry @mxm. The compilation problem were introduced due to an incomplete 
rebasing. Should be fixed now.


> Clean up JobManager registration at the ResourceManager
> ---
>
> Key: FLINK-4853
> URL: https://issues.apache.org/jira/browse/FLINK-4853
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The current {{JobManager}} registration at the {{ResourceManager}} blocks 
> threads in the {{RpcService.execute}} pool. This is not ideal and can be 
> avoided by not waiting on a {{Future}} in this call.
> I propose to encapsulate the leader id retrieval operation in a distinct 
> service so that it can be separated from the {{ResourceManager}}. This will 
> reduce the complexity of the {{ResourceManager}} and make the individual 
> components easier to test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592437#comment-15592437
 ] 

ASF GitHub Bot commented on FLINK-4871:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2669

[FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to 
MiniCluster

This PR is based on #2651, #2655 and #2657.

If the managed memory size for the task manager has not been set in the 
Configuration, then
it is automatically calculated by dividing the available memory by the 
number of distributed
components. Additionally this PR allows to provide a MetricRegistry to the 
TaskManagerRunner.
That way it is possible to use the MiniCluster's MetricRegistry.

Add memory calculation for task managers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink miniClusterUpdate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2669


commit 6487af737b57ca16190c0f4a6b63d4afd2af2b06
Author: Till Rohrmann 
Date:   2016-10-17T14:22:16Z

[FLINK-4847] Let RpcEndpoint.start/shutDown throw exceptions

Allowing the RpcEndpoint.start/shutDown to throw exceptions will help to 
let rpc endpoints
to quickly fail without having to use a callback like the FatalErrorHandler.

commit cf7661aafb71649360ad6159f27a82433bfd8f75
Author: Till Rohrmann 
Date:   2016-10-17T14:03:02Z

[FLINK-4851] [rm] Introduce FatalErrorHandler and MetricRegistry to RM

This PR introduces a FatalErrorHandler and the MetricRegistry to the RM. 
The FatalErrorHandler is used to handle fatal errors. Additionally, the PR adds 
the MetricRegistry to the RM which can be used
to register metrics.

Apart from these changes the PR restructures the code of the RM a little 
bit and fixes some
blocking operations.

The PR also moves the TestingFatalErrorHandler into the util package of 
flink-runtime test. That
it is usable across multiple tests.

Introduce ResourceManagerRunner to handle errors in the ResourceManager

commit 61d328adb637c44889aa724c270c39657d1289c2
Author: Till Rohrmann 
Date:   2016-10-18T16:03:00Z

[FLINK-4853] [rm] Clean up job manager registration at the resource manager

Introduce the JobLeaderIdService which automatically retrieves the current 
job leader id.
This job leader id is used to validate job manager registartion attempts. 
Additionally, it
is used to disconnect old job leaders from the resource manager.

Add comments

commit 58e8d6c06b55456793d2dffbee309e491d21d309
Author: Till Rohrmann 
Date:   2016-10-20T09:07:08Z

[FLINK-4871] [mini cluster] Add memory calculation for TaskManagers to 
MiniCluster

If the managed memory size for the task manager has not been set in the 
Configuration, then
it is automatically calculated by dividing the available memory by the 
number of distributed
components. Additionally this PR allows to provide a MetricRegistry to the 
TaskManagerRunner.
That way it is possible to use the MiniCluster's MetricRegistry.

Add memory calculation for task managers




> Add memory calculation for TaskManagers and forward MetricRegistry
> --
>
> Key: FLINK-4871
> URL: https://issues.apache.org/jira/browse/FLINK-4871
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Add automatic memory calculation for {{TaskManagers}} executed by the 
> {{MiniCluster}}. 
> Additionally, change the {{TaskManagerRunner}} to accept a given 
> {{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is 
> used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4871) Add memory calculation for TaskManagers and forward MetricRegistry

2016-10-20 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4871:


 Summary: Add memory calculation for TaskManagers and forward 
MetricRegistry
 Key: FLINK-4871
 URL: https://issues.apache.org/jira/browse/FLINK-4871
 Project: Flink
  Issue Type: Sub-task
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


Add automatic memory calculation for {{TaskManagers}} executed by the 
{{MiniCluster}}. 

Additionally, change the {{TaskManagerRunner}} to accept a given 
{{MetricRegistry}} so that the one instantiated by the {{MiniCluster}} is used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2668: Add EvaluateDataSetOperation for LabeledVector. Th...

2016-10-20 Thread tfournier314
GitHub user tfournier314 opened a pull request:

https://github.com/apache/flink/pull/2668

Add EvaluateDataSetOperation for LabeledVector. This closes #4865

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tfournier314/flink LabeledDataInEvaluate

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2668.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2668






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592359#comment-15592359
 ] 

Gabor Gevay commented on FLINK-4867:


Hm, this sounds quite interesting. Could you please share the code?

> As it is radix based and not comparison based, it would require some way to 
> expose partial sort keys instead of a compareTo method

Isn't the normalized key stuff that we already have solves this part?

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay reassigned FLINK-4867:
--

Assignee: Gabor Gevay

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592367#comment-15592367
 ] 

Gabor Gevay commented on FLINK-4867:


That's nice, thanks!

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2016-10-20 Thread Philipp von dem Bussche (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592357#comment-15592357
 ] 

Philipp von dem Bussche commented on FLINK-2821:


Thank you for all your effort [~mxm]. I have tested this and was able to 
connect a jobmanager and a task manager in a docker-machine environment on my 
Mac as well as in Rancher. For the Rancher setup to work though I had to have 
the bind-address be set to 0.0.0.0 . I think this makes sense since Rancher 
introduces this additional 10.x address (on top of the 172.x address given by 
Docker) but when specifying the hostname as bind address it would only bind to 
the 172.x address.
One other thing which I realized was that my local flink cli on my Mac would 
not work together with your customer build anymore because of version 
discrepancies. I felt this is quite harsh given that I am running 1.1.3 on 
bother sides but obviously different builds.
I will play around with this a bit more and send some data across and let you 
know if I see anything else popping up.
Thanks again !

> Change Akka configuration to allow accessing actors from different URLs
> ---
>
> Key: FLINK-2821
> URL: https://issues.apache.org/jira/browse/FLINK-2821
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Reporter: Robert Metzger
>Assignee: Maximilian Michels
>
> Akka expects the actor's URL to be exactly matching.
> As pointed out here, cases where users were complaining about this: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html
>   - Proxy routing (as described here, send to the proxy URL, receiver 
> recognizes only original URL)
>   - Using hostname / IP interchangeably does not work (we solved this by 
> always putting IP addresses into URLs, never hostnames)
>   - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still 
> no solution to that (but seems not too much of a restriction)
> I am aware that this is not possible due to Akka, so it is actually not a 
> Flink bug. But I think we should track the resolution of the issue here 
> anyways because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4870) ContinuousFileMonitoringFunction does not properly handle absolut Windows paths

2016-10-20 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-4870:
---

 Summary: ContinuousFileMonitoringFunction does not properly handle 
absolut Windows paths
 Key: FLINK-4870
 URL: https://issues.apache.org/jira/browse/FLINK-4870
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.1.2
Reporter: Chesnay Schepler
Priority: Minor
 Fix For: 1.2.0


The ContinuousFileMonitoringFunction fails for absolute windows paths without  
a dedicated scheme (e.g "C:\\tmp\\test.csv"), since the String path is directly 
fed into the URI constructor (which doesn't handle it properly) instead of 
first creating a flink Path and converting that into an URI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4869) Store record pointer after record keys

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4869:
---
Labels: performance  (was: )

> Store record pointer after record keys
> --
>
> Key: FLINK-4869
> URL: https://issues.apache.org/jira/browse/FLINK-4869
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> {{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} 
> separate from the memory segments used for the sort keys. By storing the 
> pointer after the sort keys the addition of the offset is moved from 
> {{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which 
> are O\(n).
> Will run a performance comparison before submitting a PR to how significant a 
> performance improvement this would yield.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4860) Sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4860:
---
Component/s: Local Runtime

> Sort performance
> 
>
> Key: FLINK-4860
> URL: https://issues.apache.org/jira/browse/FLINK-4860
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Greg Hogan
>  Labels: performance
>
> A super-task for improvements to Flink's sort performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4860) Sort performance

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4860:
---
Labels: performance  (was: )

> Sort performance
> 
>
> Key: FLINK-4860
> URL: https://issues.apache.org/jira/browse/FLINK-4860
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Greg Hogan
>  Labels: performance
>
> A super-task for improvements to Flink's sort performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4869) Store record pointer after record keys

2016-10-20 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4869:
-

 Summary: Store record pointer after record keys
 Key: FLINK-4869
 URL: https://issues.apache.org/jira/browse/FLINK-4869
 Project: Flink
  Issue Type: Sub-task
  Components: Core
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


{{NormalizedKeySorter}} serializes records into a {{RandomAccessInputView}} 
separate from the memory segments used for the sort keys. By storing the 
pointer after the sort keys the addition of the offset is moved from 
{{NormalizedKeySorter.compare}} which is O(n log n)) to other methods which are 
O\(n).

Will run a performance comparison before submitting a PR to how significant a 
performance improvement this would yield.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292
 ] 

Stefan Richter commented on FLINK-4867:
---

If you sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592292#comment-15592292
 ] 

Stefan Richter edited comment on FLINK-4867 at 10/20/16 4:32 PM:
-

If sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.


was (Author: srichter):
If you sort performance is crucial to you, I wrote some inplace radix sort 
algorithm that was extremely fast for me in a precious project. On primitives 
types and serialized byte strings I found it typically factor 2-3x faster than 
JDK's Arrays.sort() for primitives or multikey quicksort for Strings). I was 
considering to port it onto Flink, but did not find the time yet. As it is 
radix based and not comparison based, it would require some way to expose 
partial sort keys instead of a compareTo method . If that is interesting to you 
let me know and I can share the original code.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592246#comment-15592246
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/2570
  
Rebased version looks good to me.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592239#comment-15592239
 ] 

Greg Hogan commented on FLINK-4867:
---

It will be very interesting to see the results of this project. Perhaps you 
should self-assign the ticket until it can be handed over?

Inline status is logged with the JVM arguments {{-XX:+UnlockDiagnosticVMOptions 
-XX:+PrintInlining}}.

> Investigate code generation for improving sort performance
> --
>
> Key: FLINK-4867
> URL: https://issues.apache.org/jira/browse/FLINK-4867
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This issue is for investigating whether code generation could speed up 
> sorting. We should make some performance measurements on hand-written code 
> that is similar to what we could generate, to see whether investing more time 
> into this is worth it. If we find that it is worth it, we can open a second 
> Jira for the actual implementation of the code generation.
> I think we could generate one class at places where we currently instantiate 
> {{QuickSort}}. This generated class would include the functionality of 
> {{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
> {{MemorySegment.compare}}, and {{MemorySegment.swap}}.
> Btw. I'm planning to give this as a student project at a TU Berlin course in 
> the next few months.
> Some concrete ideas about how could a generated sorter be faster than the 
> current sorting code:
> * {{MemorySegment.compare}} could be specialized for
> ** Length: for small records, the loop could be unrolled
> ** Endiannes (currently it is optimized for big endian; and in the little 
> endian case (e.g. x86) it does a Long.reverseBytes for each long read)
> * {{MemorySegment.swapBytes}}
> ** In case of small records, using three {{UNSAFE.copyMemory}} is probably 
> not as efficient as a specialized swap, because
> *** We could use total loop unrolling in generated code (because we know the 
> exact record size)
> *** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
> *** We will only need 2/3 the memory bandwidth, because the temporary storage 
> could be a register if we swap one byte (or one {{long}}) at a time
> ** several checks might be eliminated
> * Better inlining behaviour could be achieved 
> ** Virtual function calls to the methods of {{InMemorySorter}} could be 
> eliminated. (Note, that these are problematic to devirtualize by the JVM if 
> there are different derived classes used in a single Flink job (see \[8,7\]).)
> ** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
> excessive checks make it too large
> ** {{MemorySegment.compare}} is probably also not inlined currently, because 
> those two while loops are too large
> \[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
> long, Object, long, long)
> \[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
> \[8\] 
> http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
> \[9\] 
> http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-4868:
---
Description: 
This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)

Note that the threshold for switching to the insertion sort could probably be 
increased after this.

  was:
This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)


> Insertion sort could avoid the swaps
> 
>
> Key: FLINK-4868
> URL: https://issues.apache.org/jira/browse/FLINK-4868
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: performance
>
> This is about the fallback to insertion sort at the beginning of 
> {{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
> we are at the bottom of the quick sort recursion tree.
> The inner loop does a series of swaps on adjacent elements for moving a block 
> of several elements one slot to the right and inserting the ith element at 
> the hole. However, it would be faster to first copy the ith element to a temp 
> location, and then move the block of elements to the right without swaps, and 
> then copy the original ith element to the hole.
> Moving the block of elements without swaps could be achieved by calling 
> {{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
> calls in {{MemorySegment.swap}} currently being done).
> (Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
> memcpy, so I'm not sure if we can do the entire block of elements with maybe 
> even one {{UNSAFE.copyMemory}}.)
> Note that the threshold for switching to the insertion sort could probably be 
> increased after this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4868) Insertion sort could avoid the swaps

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4868:
--

 Summary: Insertion sort could avoid the swaps
 Key: FLINK-4868
 URL: https://issues.apache.org/jira/browse/FLINK-4868
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This is about the fallback to insertion sort at the beginning of 
{{QuickSort.sortInternal}}. It is quite a hot code as it runs every time when 
we are at the bottom of the quick sort recursion tree.

The inner loop does a series of swaps on adjacent elements for moving a block 
of several elements one slot to the right and inserting the ith element at the 
hole. However, it would be faster to first copy the ith element to a temp 
location, and then move the block of elements to the right without swaps, and 
then copy the original ith element to the hole.

Moving the block of elements without swaps could be achieved by calling 
{{UNSAFE.copyMemory}} only once for every element (as opposed to the three 
calls in {{MemorySegment.swap}} currently being done).

(Note that I'm not sure whether {{UNSAFE.copyMemory}} is like memmove or like 
memcpy, so I'm not sure if we can do the entire block of elements with maybe 
even one {{UNSAFE.copyMemory}}.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592186#comment-15592186
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2628
  
> I transcribed Quicksort so as to remove considerations of Java 
performance and inlining. It was not clear to me that if we encapsulated the 
index, page number, and page offset into an object that Java would inline the 
various increment and decrement functions. Also, I don't think this looks too 
bad. I'm happy to reformat if that is preferred.

OK, I would say that it is OK like this, but let's see what the others will 
say.


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4867) Investigate code generation for improving sort performance

2016-10-20 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-4867:
--

 Summary: Investigate code generation for improving sort performance
 Key: FLINK-4867
 URL: https://issues.apache.org/jira/browse/FLINK-4867
 Project: Flink
  Issue Type: Sub-task
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Minor


This issue is for investigating whether code generation could speed up sorting. 
We should make some performance measurements on hand-written code that is 
similar to what we could generate, to see whether investing more time into this 
is worth it. If we find that it is worth it, we can open a second Jira for the 
actual implementation of the code generation.

I think we could generate one class at places where we currently instantiate 
{{QuickSort}}. This generated class would include the functionality of 
{{QuickSort}}, {{NormalizedKeySorter}} or {{FixedLengthRecordSorter}}, 
{{MemorySegment.compare}}, and {{MemorySegment.swap}}.

Btw. I'm planning to give this as a student project at a TU Berlin course in 
the next few months.

Some concrete ideas about how could a generated sorter be faster than the 
current sorting code:
* {{MemorySegment.compare}} could be specialized for
** Length: for small records, the loop could be unrolled
** Endiannes (currently it is optimized for big endian; and in the little 
endian case (e.g. x86) it does a Long.reverseBytes for each long read)
* {{MemorySegment.swapBytes}}
** In case of small records, using three {{UNSAFE.copyMemory}} is probably not 
as efficient as a specialized swap, because
*** We could use total loop unrolling in generated code (because we know the 
exact record size)
*** {{UNSAFE.copyMemory}} checks for alignment first \[6,9\]
*** We will only need 2/3 the memory bandwidth, because the temporary storage 
could be a register if we swap one byte (or one {{long}}) at a time
** several checks might be eliminated
* Better inlining behaviour could be achieved 
** Virtual function calls to the methods of {{InMemorySorter}} could be 
eliminated. (Note, that these are problematic to devirtualize by the JVM if 
there are different derived classes used in a single Flink job (see \[8,7\]).)
** {{MemorySegment.swapBytes}} is probably not inlined currently, because the 
excessive checks make it too large
** {{MemorySegment.compare}} is probably also not inlined currently, because 
those two while loops are too large

\[6\] http://www.docjar.com/docs/api/sun/misc/Unsafe.html#copyMemory(Object, 
long, Object, long, long)
\[7\] https://shipilev.net/blog/2015/black-magic-method-dispatch/
\[8\] 
http://insightfullogic.com/2014/May/12/fast-and-megamorphic-what-influences-method-invoca/
\[9\] 
http://hg.openjdk.java.net/jdk8/jdk8/hotspot/file/87ee5ee27509/src/cpu/x86/vm/stubGenerator_x86_64.cpp#l2409



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2570: [FLINK-3674] Add an interface for Time aware User Functio...

2016-10-20 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2570
  
I updated this on top of the latest master with @StefanRRichter's state 
changes.

Please take another look, @StefanRRichter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-20 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-4844.
-
   Resolution: Implemented
Fix Version/s: 1.2.0

> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4842) Introduce test to enforce order of operator / udf lifecycles

2016-10-20 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-4842.
-
   Resolution: Implemented
Fix Version/s: 1.2.0

> Introduce test to enforce order of operator / udf lifecycles 
> -
>
> Key: FLINK-4842
> URL: https://issues.apache.org/jira/browse/FLINK-4842
> Project: Flink
>  Issue Type: Test
>Reporter: Stefan Richter
>Assignee: Stefan Richter
> Fix For: 1.2.0
>
>
> We should introduce a test that enforces a certain order in which life cycle 
> methods of operators and udfs are called, so that they are not easily changed 
> by accident.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2662: [FLINK-4824] [client] CliFrontend shows misleading error ...

2016-10-20 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2662
  
@mxm thanks for the review. I added a second commit which I think satisfies 
your request. When no job is executed then the message is printed to stderr 
without a stacktrace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4844) Partitionable Raw Keyed/Operator State

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592051#comment-15592051
 ] 

ASF GitHub Bot commented on FLINK-4844:
---

Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2648


> Partitionable Raw Keyed/Operator State
> --
>
> Key: FLINK-4844
> URL: https://issues.apache.org/jira/browse/FLINK-4844
> Project: Flink
>  Issue Type: New Feature
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>
> Partitionable operator and keyed state are currently only available by using 
> backends. However, the serialization code for many operators is build around 
> reading/writing their state to a stream for checkpointing. We want to provide 
> partitionable states also through streams, so that migrating existing 
> operators becomes more easy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2648: [FLINK-4844] Partitionable Raw Keyed/Operator Stat...

2016-10-20 Thread StefanRRichter
Github user StefanRRichter closed the pull request at:

https://github.com/apache/flink/pull/2648


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3674) Add an interface for Time aware User Functions

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592052#comment-15592052
 ] 

ASF GitHub Bot commented on FLINK-3674:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2570
  
I updated this on top of the latest master with @StefanRRichter's state 
changes.

Please take another look, @StefanRRichter.


> Add an interface for Time aware User Functions
> --
>
> Key: FLINK-3674
> URL: https://issues.apache.org/jira/browse/FLINK-3674
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
> void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction, 
> EventTimeFunction {
> private long currentEventTime = Long.MIN_VALUE;
> public String map(String value) {
> return value + " @ " + currentEventTime;
> }
> public void onWatermark(Watermark watermark) {
> currentEventTime = watermark.getTimestamp();
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4824) CliFrontend shows misleading error message when main() method returns before env.execute()

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15592027#comment-15592027
 ] 

ASF GitHub Bot commented on FLINK-4824:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2662
  
@mxm thanks for the review. I added a second commit which I think satisfies 
your request. When no job is executed then the message is printed to stderr 
without a stacktrace.


> CliFrontend shows misleading error message when main() method returns before 
> env.execute()
> --
>
> Key: FLINK-4824
> URL: https://issues.apache.org/jira/browse/FLINK-4824
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Greg Hogan
>
> While testing Flink by running the 
> {{./examples/streaming/SocketWindowWordCount.jar}} example, I got the 
> following error message:
> {code}
> ./bin/flink run ./examples/streaming/SocketWindowWordCount.jar 
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> No port specified. Please run 'SocketWindowWordCount --port ', where 
> port is the address of the text server
> To start a simple text server, run 'netcat -l ' and type the input text 
> into the command line
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> didn't contain Flink jobs. Perhaps you forgot to call execute() on the 
> execution environment.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:324)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:774)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:985)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1032)
>   at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:1029)
>   at 
> org.apache.flink.runtime.security.SecurityContext$1.run(SecurityContext.java:82)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at 
> org.apache.flink.runtime.security.SecurityContext.runSecured(SecurityContext.java:79)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1029)
> {code}
> I think the error message is misleading, because I tried executing a valid 
> Flink job.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4832) Count/Sum 0 elements

2016-10-20 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591955#comment-15591955
 ] 

Anton Mushin edited comment on FLINK-4832 at 10/20/16 2:31 PM:
---

Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
 @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

@Test
  def testCountNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0,0,0,0"
val results = result.toDataSet[Row].collect()

TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}


was (Author: anmu):
Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT sum(_1) FROM MyTable"

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "231"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84276796
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -43,16 +41,18 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
+import static 
org.apache.flink.streaming.api.functions.source.RichFileInputSplit.EOS;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The operator that reads the {@link FileInputSplit splits} received from 
the preceding
+ * The operator that reads the {@link RichFileInputSplit splits} received 
from the preceding
  * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link 
ContinuousFileMonitoringFunction}
  * which has a parallelism of 1, this operator can have DOP > 1.
  * 
--- End diff --

Generic types are not documented in the JavaDoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288975
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -347,34 +328,17 @@ public void run() {
}
}
 
-   private Tuple3 
getReaderState() throws IOException {
-   List snapshot = new 
ArrayList<>(this.pendingSplits.size());
-   for (FileInputSplit split: this.pendingSplits) {
-   snapshot.add(split);
-   }
-
-   // remove the current split from the list if inside.
-   if (this.currentSplit != null && 
this.currentSplit.equals(pendingSplits.peek())) {
-   this.pendingSplits.remove();
-   }
-
-   if (this.currentSplit != null) {
-   if (this.format instanceof 
CheckpointableInputFormat) {
-   @SuppressWarnings("unchecked")
-   
CheckpointableInputFormat checkpointableFormat =
-   
(CheckpointableInputFormat) this.format;
-
-   S formatState = this.isSplitOpen ?
-   
checkpointableFormat.getCurrentState() :
-   restoredFormatState;
-   return new Tuple3<>(snapshot, 
currentSplit, formatState);
-   } else {
-   LOG.info("The format does not support 
checkpointing. The current input split will be re-read from start upon 
recovery.");
-   return new Tuple3<>(snapshot, 
currentSplit, null);
+   private List getReaderState() throws 
IOException {
+   List snapshot = new 
ArrayList<>(this.pendingSplits.size());
+   if (currentSplit != null ) {
+   if (this.format instanceof 
CheckpointableInputFormat && this.isSplitOpen) {
+   S formatState = 
((CheckpointableInputFormat) 
this.format).getCurrentState();
--- End diff --

```java
Serializable formatState = ((CheckpointableInputFormat) 
this.format).getCurrentState();
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4832) Count/Sum 0 elements

2016-10-20 Thread Anton Mushin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591955#comment-15591955
 ] 

Anton Mushin commented on FLINK-4832:
-

Hello
I think that it needs to change 
{{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
 also, because 
{{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will 
be called if elements are in inputData.
{code}
TypeSerializer inSerializer = 
getOperatorInfo().getInputType().createSerializer(executionConfig);
TypeSerializer outSerializer = 
getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (IN element : inputData) {
IN inCopy = inSerializer.copy(element);
OUT out = function.map(inCopy);
result.add(outSerializer.copy(out));
}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} 
will be edited for examle as
{code}
override def initiate(partial: Row): Unit = {
partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum 
class is extends SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery = "SELECT sum(_1) FROM MyTable"

val ds = CollectionDataSets.get3TupleDataSet(env)
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "231"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val sqlQuery =
  "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
"FROM (select * from MyTable where _1 = 4)"

val ds = env.fromElements(
  (1: Byte, 2l,1D,1f,1,1:Short ),
  (2: Byte, 2l,1D,1f,1,1:Short ))
tEnv.registerDataSet("MyTable", ds)

val result = tEnv.sql(sqlQuery)

val expected = "0,0,0.0,0.0,0,0"
val results = result.toDataSet[Row].collect()
TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> 
>
> Key: FLINK-4832
> URL: https://issues.apache.org/jira/browse/FLINK-4832
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should 
> improve DataSet aggregations for this. Maybe by union the original DataSet 
> with a dummy record or by using a MapPartition function. Coming up with a 
> good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84290827
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

Sure, makes sense since those ports are not reachable by TaskManagers 
running in different containers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread sedgewickmm18
Github user sedgewickmm18 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84291047
  
--- Diff: flink-contrib/docker-flink/docker-compose.sh ---
@@ -0,0 +1,4 @@
+#!/bin/sh
--- End diff --

that's fine


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2628: [FLINK-3722] [runtime] Don't / and % when sorting

2016-10-20 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2628
  
Thanks @ggevay for reviewing. I added a commit with additional comments.

I transcribed `Quicksort` so as to remove considerations of Java 
performance and inlining. It was not clear to me that if we encapsulated the 
index, page number, and page offset into an object that Java would inline the 
various increment and decrement functions. Also, I don't think this looks too 
bad. I'm happy to reformat if that is preferred.

I think this is the best time to investigate alternative methods. I'm not 
seeing how one would sort on top of `InMemorySorter` without deserializing 
records.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84279968
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -189,7 +186,7 @@ public void close() throws Exception {
output.close();
}
 
-   private class SplitReader extends Thread {
+   private final class SplitReader extends Thread {
--- End diff --

Making private classes final is not really necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84281147
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
--- End diff --

Is it really necessary to have this special split? Couldn't you just have a 
`reader.stop()` method which stops the reader after the current split has been 
processed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84284953
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/RichFileInputSplitTest.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.RichFileInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RichFileInputSplitTest {
+
+   @Test
+   public void testSplitEquality() {
+
+   RichFileInputSplit eos1 = RichFileInputSplit.EOS;
+   RichFileInputSplit eos2 = RichFileInputSplit.EOS;
+
+   Assert.assertEquals(eos1, eos2);
+
+   FileInputSplit firstSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, 
firstSplit);
+   Assert.assertNotEquals(eos1, richFirstSplit);
+   Assert.assertNotEquals(richFirstSplit, firstSplit);
+
+   FileInputSplit secondSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, 
secondSplit);
+   Assert.assertEquals(richFirstSplit, richSecondSplit);
+   Assert.assertNotEquals(richFirstSplit, firstSplit);
+
+   FileInputSplit modSecondSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   RichFileInputSplit richModSecondSplit = new 
RichFileInputSplit(11, modSecondSplit);
+   Assert.assertNotEquals(richSecondSplit, richModSecondSplit);
+
+   FileInputSplit thirdSplit = new FileInputSplit(2, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, 
thirdSplit);
+   Assert.assertEquals(richThirdSplit.getModificationTime(), 10);
+   Assert.assertNotEquals(richFirstSplit, richThirdSplit);
+
+   FileInputSplit thirdSplitCopy = new FileInputSplit(2, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richThirdSplitCopy = new 
RichFileInputSplit(10, thirdSplitCopy);
+   Assert.assertEquals(richThirdSplitCopy, richThirdSplit);
+   }
+
+   @Test
+   public void testSplitComparison() {
+   FileInputSplit firstSplit = new FileInputSplit(3, new 
Path("test/test1"), 0, 100, null);
+   RichFileInputSplit richFirstSplit = new RichFileInputSplit(10, 
firstSplit);
+
+   FileInputSplit secondSplit = new FileInputSplit(2, new 
Path("test/test2"), 0, 100, null);
+   RichFileInputSplit richSecondSplit = new RichFileInputSplit(10, 
secondSplit);
+
+   FileInputSplit thirdSplit = new FileInputSplit(1, new 
Path("test/test2"), 0, 100, null);
+   RichFileInputSplit richThirdSplit = new RichFileInputSplit(10, 
thirdSplit);
+
+   FileInputSplit forthSplit = new FileInputSplit(0, new 
Path("test/test3"), 0, 100, null);
+   RichFileInputSplit richForthSplit = new RichFileInputSplit(11, 
forthSplit);
+
+   // lexicographically on the path order
+   Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 
0);
+   Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0);
+
+   // same mod time, same file so smaller split number first
+   Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 
0);
+
+   // smaller modification time first
+   Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
+   }
+
+   @Test
+   public void testIllegalArgument() {
+   try {
+   FileInputSplit firstSplit = new FileInputSplit(2, new 
Path("test"), 0, 100, null);
+   new 

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288776
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+
+   /**
+* Creates a {@link RichFileInputSplit} based on the file modification 
time and
+* the rest of the information of the {@link FileInputSplit}, as 
returned by the
+* underlying filesystem.
+*
+* @param modificationTime  the modification file of the file this 
split belongs to
+* @param split the rest of the information about the split
+*/
+   public RichFileInputSplit(long modificationTime, FileInputSplit split) {
+   this(modificationTime,
+   split.getSplitNumber(),
+   split.getPath(),
+   split.getStart(),
+   split.getLength(),
+   split.getHostnames());
+   }
+
+   /**
+* Constructor with the raw split information.
+*
+* @param modificationTime the modification file of the file this split 
belongs to
+* @param numthe number of this input split
+* @param file   the file name
+* @param start  the position of the first byte in the file to process
+* @param length the number of bytes in the file to process (-1 is flag 
for "read whole file")
+* @param hosts  the list of hosts containing the block, possibly 
null
+*/
+   private RichFileInputSplit(long modificationTime, int num, Path file, 
long start, long length, String[] hosts) {
+   super(num, file, start, length, hosts);
+
+   Preconditions.checkArgument(modificationTime >= 0 || 
modificationTime == Long.MIN_VALUE,
+   "Invalid File Split Modification Time: "+ 
modificationTime +".");
+
+   this.modificationTime = modificationTime;
+   }
+
+   /**
+* Sets the state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* 
+* This is applicable to {@link 
org.apache.flink.api.common.io.FileInputFormat FileInputFormats}
+* that implement the {@link 
org.apache.flink.api.common.io.CheckpointableInputFormat
+* CheckpointableInputFormat} interface.
+* */
+   public void setSplitState(S state) {
+   this.splitState = state;
+   }
+
+   /**
+* Sets the state of the split to 

[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288567
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
--- End diff --

```java
private Serializable splitState;
```
should be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84285924
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
--- End diff --

The name rich :) I'd be happy if we could find another name. Rich doesn't 
really mean anything. How about `TimestampedFileInputSplit`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84288480
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
--- End diff --

I think you can drop the type parameter here since you don't gain any type 
safety from the parameter. It is never used in any argument which would make it 
meaningful. Instead just use `Serializable` for the state type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84285533
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichFileInputSplit.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * 
+ * The modification time of the file this split belongs to.
+ * When checkpointing, the state of the split at the moment of the 
checkpoint.
+ * 
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and 
the
+ * {@link ContinuousFileReaderOperator} to perform continuous file 
processing.
+ * */
+public class RichFileInputSplit
+   extends FileInputSplit implements 
Comparable{
+
+   /** The modification time of the file this split belongs to. */
+   private final long modificationTime;
+
+   /**
+* The state of the split. This information is used when
+* restoring from a checkpoint and allows to resume reading the
+* underlying file from the point we left off.
+* */
+   private S splitState;
+
+   /** A special {@link RichFileInputSplit} signaling the end of the 
stream of splits.*/
+   public static final RichFileInputSplit EOS =
+   new RichFileInputSplit<>(Long.MIN_VALUE, -1, null, -1, -1, 
null);
+
+   /**
+* Creates a {@link RichFileInputSplit} based on the file modification 
time and
+* the rest of the information of the {@link FileInputSplit}, as 
returned by the
+* underlying filesystem.
+*
+* @param modificationTime  the modification file of the file this 
split belongs to
+* @param split the rest of the information about the split
+*/
+   public RichFileInputSplit(long modificationTime, FileInputSplit split) {
--- End diff --

Not sure about this constructor. I think I'd prefer something spelling out 
the parameters. This also avoids to create a regular FileInputSplit every time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2618: Refactoring the Continuous File Monitoring Functio...

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2618#discussion_r84280372
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -199,44 +196,39 @@ public void close() throws Exception {
private final Object checkpointLock;
private final SourceFunction.SourceContext readerContext;
 
-   private final Queue pendingSplits;
-
-   private FileInputSplit currentSplit = null;
+   private final Queue pendingSplits;
 
-   private S restoredFormatState = null;
+   private RichFileInputSplit currentSplit;
 
-   private volatile boolean isSplitOpen = false;
+   private volatile boolean isSplitOpen;
 
private SplitReader(FileInputFormat format,
TypeSerializer serializer,
SourceFunction.SourceContext 
readerContext,
Object checkpointLock,
-   Tuple3 restoredState) {
+   List 
restoredState) {
 
this.format = checkNotNull(format, "Unspecified 
FileInputFormat.");
this.serializer = checkNotNull(serializer, "Unspecified 
Serializer.");
this.readerContext = checkNotNull(readerContext, 
"Unspecified Reader Context.");
this.checkpointLock = checkNotNull(checkpointLock, 
"Unspecified checkpoint lock.");
 
-   this.pendingSplits = new ArrayDeque<>();
this.isRunning = true;
 
-   // this is the case where a task recovers from a 
previous failed attempt
-   if (restoredState != null) {
-   List pending = restoredState.f0;
-   FileInputSplit current = restoredState.f1;
-   S formatState = restoredState.f2;
-
-   for (FileInputSplit split : pending) {
-   pendingSplits.add(split);
+   this.pendingSplits = new PriorityQueue<>(100, new 
Comparator() {
--- End diff --

Why did you choose 100 as the initial size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591881#comment-15591881
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2628
  
Thanks @ggevay for reviewing. I added a commit with additional comments.

I transcribed `Quicksort` so as to remove considerations of Java 
performance and inlining. It was not clear to me that if we encapsulated the 
index, page number, and page offset into an object that Java would inline the 
various increment and decrement functions. Also, I don't think this looks too 
bad. I'm happy to reformat if that is preferred.

I think this is the best time to investigate alternative methods. I'm not 
seeing how one would sort on top of `InMemorySorter` without deserializing 
records.


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread sedgewickmm18
Github user sedgewickmm18 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84287069
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

These lines expose taskmanager's RPC and data ports to make them accessible 
in the private subnet, please see 
https://docs.docker.com/docker-cloud/apps/ports/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread sedgewickmm18
Github user sedgewickmm18 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84286390
  
--- Diff: flink-contrib/docker-flink/docker-entrypoint.sh ---
@@ -20,11 +20,19 @@
 
 if [ "$1" = "jobmanager" ]; then
 echo "Starting Job Manager"
-sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
`hostname -f`/g" $FLINK_HOME/conf/flink-conf.yaml
+#sed -i -e "s/jobmanager.rpc.address: 
localhost/jobmanager.rpc.address: `hostname -f`/g" 
$FLINK_HOME/conf/flink-conf.yaml
+
+# make use of container linking and exploit the jobmanager entry in 
/etc/hosts
+sed -i -e "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: 
jobmanager/g" $FLINK_HOME/conf/flink-conf.yaml
+
 sed -i -e "s/taskmanager.numberOfTaskSlots: 
1/taskmanager.numberOfTaskSlots: `grep -c ^processor /proc/cpuinfo`/g" 
$FLINK_HOME/conf/flink-conf.yaml
--- End diff --

agree - makes much more sense that way - otherwise it's always one slot by 
default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242539
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
--- End diff --

What are these ports needed for? The TaskManager will always initiate the 
connection to the JobManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around

2016-10-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591808#comment-15591808
 ] 

Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM:


WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 


was (Author: jan-zajic):
WE have same issue on Linux in docker, but *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 

> Discarded FileSystem checkpoints are lingering around
> -
>
> Key: FLINK-3902
> URL: https://issues.apache.org/jira/browse/FLINK-3902
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Ufuk Celebi
>
> A user reported that checkpoints with {{FSStateBackend}} are not properly 
> cleaned up.
> {code}
> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
> blk_1084791727_11053122 10.10.113.10:50010
> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler 
> 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
> 10.10.113.9:49233 Call#12337 Retry#0
> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
> empty': Directory is not empty
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
> {code}
> {code}
> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 62 @ 1462875622636
> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 62 (in 9843 ms)
> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 63 @ 1462875652637
> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 63 (in 13909 ms)
> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 64 @ 1462875682636
> {code}
> Running the same program with the {{RocksDBBackend}} works as expected and 
> clears the old checkpoints properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-3902) Discarded FileSystem checkpoints are lingering around

2016-10-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591808#comment-15591808
 ] 

Jan Zajíc edited comment on FLINK-3902 at 10/20/16 1:24 PM:


We have same issue on Linux in docker, but with *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 


was (Author: jan-zajic):
WE have same issue on Linux in docker, but with *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 

> Discarded FileSystem checkpoints are lingering around
> -
>
> Key: FLINK-3902
> URL: https://issues.apache.org/jira/browse/FLINK-3902
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Ufuk Celebi
>
> A user reported that checkpoints with {{FSStateBackend}} are not properly 
> cleaned up.
> {code}
> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
> blk_1084791727_11053122 10.10.113.10:50010
> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler 
> 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
> 10.10.113.9:49233 Call#12337 Retry#0
> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
> empty': Directory is not empty
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
> {code}
> {code}
> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 62 @ 1462875622636
> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 62 (in 9843 ms)
> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 63 @ 1462875652637
> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 63 (in 13909 ms)
> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 64 @ 1462875682636
> {code}
> Running the same program with the {{RocksDBBackend}} works as expected and 
> clears the old checkpoints properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3902) Discarded FileSystem checkpoints are lingering around

2016-10-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591808#comment-15591808
 ] 

Jan Zajíc commented on FLINK-3902:
--

WE have same issue on Linux in docker, but *both* ! RocksDBBackend and 
FSStateBackend. On Windows developer local machine it works just fine. 

> Discarded FileSystem checkpoints are lingering around
> -
>
> Key: FLINK-3902
> URL: https://issues.apache.org/jira/browse/FLINK-3902
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Ufuk Celebi
>
> A user reported that checkpoints with {{FSStateBackend}} are not properly 
> cleaned up.
> {code}
> 2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* addToInvalidates: 
> blk_1084791727_11053122 10.10.113.10:50010
> 2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server handler 
> 9 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
> 10.10.113.9:49233 Call#12337 Retry#0
> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: 
> `/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non 
> empty': Directory is not empty
> at 
> org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712)
> {code}
> {code}
> 2016-05-10 12:20:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 62 @ 1462875622636
> 2016-05-10 12:20:32,507 [flink-akka.actor.default-dispatcher-240088] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 62 (in 9843 ms)
> 2016-05-10 12:20:52,637 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 63 @ 1462875652637
> 2016-05-10 12:21:06,563 [flink-akka.actor.default-dispatcher-240028] INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 63 (in 13909 ms)
> 2016-05-10 12:21:22,636 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 64 @ 1462875682636
> {code}
> Running the same program with the {{RocksDBBackend}} works as expected and 
> clears the old checkpoints properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2570: [FLINK-3674] Add an interface for Time aware User ...

2016-10-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2570#discussion_r84255151
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultProcessingTimeService.java
 ---
@@ -35,10 +34,10 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link TimeServiceProvider} which assigns as current processing time 
the result of calling
+ * A {@link ProcessingTimeService} which assigns as current processing 
time the result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
  */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
+public class DefaultProcessingTimeService extends ProcessingTimeService {
 
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2628: [FLINK-3722] [runtime] Don't / and % when sorting

2016-10-20 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2628#discussion_r84248344
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
 ---
@@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) {
 * then switch to {@link HeapSort}.
 */
public void sort(final IndexedSortable s, int p, int r) {
-   sortInternal(s, p, r, getMaxDepth(r - p));
+   int recordsPerSegment = s.recordsPerSegment();
+   int recordSize = s.recordSize();
+
+   int maxOffset = recordSize * (recordsPerSegment - 1);
+
+   int size = s.size();
+   int sizeN = size / recordsPerSegment;
+   int sizeO = (size % recordsPerSegment) * recordSize;
+
+   sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 
0, size, sizeN, sizeO, getMaxDepth(r - p));
}
 
public void sort(IndexedSortable s) {
sort(s, 0, s.size());
}
 
-   private static void sortInternal(final IndexedSortable s, int p, int r, 
int depth) {
+   private static void sortInternal(final IndexedSortable s, int 
recordsPerSegment, int recordSize, int maxOffset,
+   int p, int pN, int pO, int r, int rN, int rO, int 
depth) {
--- End diff --

Could you please add a comment that explains all these parameters? (I 
understand them only because I know the original code and also what you are 
trying to achieve, but for someone who sees the code for the first time this 
will be quite scary.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.

2016-10-20 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2618
  
Thanks a lot @mxm !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2618: Refactoring the Continuous File Monitoring Function.

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2618
  
Thanks for updating the description. Let take a look at the changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2657: [FLINK-4853] [rm] Harden job manager registration at the ...

2016-10-20 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2657
  
This doesn't compile currently. Do you prefer if I review the PRs 
individually or review the commits in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591741#comment-15591741
 ] 

ASF GitHub Bot commented on FLINK-4853:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2657
  
This doesn't compile currently. Do you prefer if I review the PRs 
individually or review the commits in this PR?


> Clean up JobManager registration at the ResourceManager
> ---
>
> Key: FLINK-4853
> URL: https://issues.apache.org/jira/browse/FLINK-4853
> Project: Flink
>  Issue Type: Sub-task
>  Components: ResourceManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The current {{JobManager}} registration at the {{ResourceManager}} blocks 
> threads in the {{RpcService.execute}} pool. This is not ideal and can be 
> avoided by not waiting on a {{Future}} in this call.
> I propose to encapsulate the leader id retrieval operation in a distinct 
> service so that it can be separated from the {{ResourceManager}}. This will 
> reduce the complexity of the {{ResourceManager}} and make the individual 
> components easier to test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3722) The divisions in the InMemorySorters' swap/compare methods hurt performance

2016-10-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591734#comment-15591734
 ] 

ASF GitHub Bot commented on FLINK-3722:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/2628#discussion_r84248344
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/QuickSort.java
 ---
@@ -49,20 +49,40 @@ protected static int getMaxDepth(int x) {
 * then switch to {@link HeapSort}.
 */
public void sort(final IndexedSortable s, int p, int r) {
-   sortInternal(s, p, r, getMaxDepth(r - p));
+   int recordsPerSegment = s.recordsPerSegment();
+   int recordSize = s.recordSize();
+
+   int maxOffset = recordSize * (recordsPerSegment - 1);
+
+   int size = s.size();
+   int sizeN = size / recordsPerSegment;
+   int sizeO = (size % recordsPerSegment) * recordSize;
+
+   sortInternal(s, recordsPerSegment, recordSize, maxOffset, 0, 0, 
0, size, sizeN, sizeO, getMaxDepth(r - p));
}
 
public void sort(IndexedSortable s) {
sort(s, 0, s.size());
}
 
-   private static void sortInternal(final IndexedSortable s, int p, int r, 
int depth) {
+   private static void sortInternal(final IndexedSortable s, int 
recordsPerSegment, int recordSize, int maxOffset,
+   int p, int pN, int pO, int r, int rN, int rO, int 
depth) {
--- End diff --

Could you please add a comment that explains all these parameters? (I 
understand them only because I know the original code and also what you are 
trying to achieve, but for someone who sees the code for the first time this 
will be quite scary.)


> The divisions in the InMemorySorters' swap/compare methods hurt performance
> ---
>
> Key: FLINK-3722
> URL: https://issues.apache.org/jira/browse/FLINK-3722
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Greg Hogan
>Priority: Minor
>  Labels: performance
>
> NormalizedKeySorter's and FixedLengthRecordSorter's swap and compare methods 
> use divisions (which take a lot of time \[1\]) to calculate the index of the 
> MemorySegment and the offset inside the segment. [~greghogan] reported on the 
> mailing list \[2\] measuring a ~12-14% performance effect in one case.
> A possibility to improve the situation is the following:
> The way that QuickSort mostly uses these compare and swap methods is that it 
> maintains two indices, and uses them to call compare and swap. The key 
> observation is that these indices are mostly stepped by one, and 
> _incrementally_ calculating the quotient and modulo is actually easy when the 
> index changes only by one: increment/decrement the modulo, and check whether 
> the modulo has reached 0 or the divisor, and if it did, then wrap-around the 
> modulo and increment/decrement the quotient.
> To implement this, InMemorySorter would have to expose an iterator that would 
> have the divisor and the current modulo and quotient as state, and have 
> increment/decrement methods. Compare and swap could then have overloads that 
> take these iterators as arguments.
> \[1\] http://www.agner.org/optimize/instruction_tables.pdf
> \[2\] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Macro-benchmarking-for-performance-tuning-and-regression-detection-td11078.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation

2016-10-20 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-4866:
---

 Summary: Make Trigger.clear() Abstract to Enforce Implementation
 Key: FLINK-4866
 URL: https://issues.apache.org/jira/browse/FLINK-4866
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek


If the method is not abstract implementors of custom triggers will not realise 
that it could be necessary and they will likely not clean up their state/timers 
properly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4865:
---
Description: 
We can only call "evaluate" method on a DataSet[(Vector,Double)].

Eg: If our model is an SVM 

svm.evaluate(test) with test has type DataSet[(Vector,Double)]

We would like to call it on DataSet[LabeledVector] also.

 


  was:
We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: If our model is an SVM 

svm.evaluate(test) with test has type DataSet[(Double,Vector)]

We would like to call it on DataSet[LabeledVector] also.

 



> FlinkML - Add EvaluateDataSet operation for LabeledVector
> -
>
> Key: FLINK-4865
> URL: https://issues.apache.org/jira/browse/FLINK-4865
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> We can only call "evaluate" method on a DataSet[(Vector,Double)].
> Eg: If our model is an SVM 
> svm.evaluate(test) with test has type DataSet[(Vector,Double)]
> We would like to call it on DataSet[LabeledVector] also.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4865:
---
Description: 
We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: If our model is an SVM 

svm.evaluate(test) with test has type DataSet[(Double,Vector)]

We would like to call it on DataSet[LabeledVector] also.

 


  was:
We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)]

We want also to call this method on DataSet[LabeledVector]

 



> FlinkML - Add EvaluateDataSet operation for LabeledVector
> -
>
> Key: FLINK-4865
> URL: https://issues.apache.org/jira/browse/FLINK-4865
> Project: Flink
>  Issue Type: New Feature
>Reporter: Thomas FOURNIER
>Priority: Minor
>
> We can only call "evaluate" method on a DataSet[(Double,Vector)].
> Eg: If our model is an SVM 
> svm.evaluate(test) with test has type DataSet[(Double,Vector)]
> We would like to call it on DataSet[LabeledVector] also.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4865) FlinkML - Add EvaluateDataSet operation for LabeledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)
Thomas FOURNIER created FLINK-4865:
--

 Summary: FlinkML - Add EvaluateDataSet operation for LabeledVector
 Key: FLINK-4865
 URL: https://issues.apache.org/jira/browse/FLINK-4865
 Project: Flink
  Issue Type: New Feature
Reporter: Thomas FOURNIER
Priority: Minor


We can only call "evaluate" method on a DataSet[(Double,Vector)].

Eg: svm/evaluate(test) where test: DataSet[(Double,Vector)]

We want also to call this method on DataSet[LabeledVector]

 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242735
  
--- Diff: flink-contrib/docker-flink/docker-compose.sh ---
@@ -0,0 +1,4 @@
+#!/bin/sh
--- End diff --

Could we name this file `bluemix-docker-compose.sh`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2667: README.md - Description of the bluemix specif…

2016-10-20 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2667#discussion_r84242416
  
--- Diff: flink-contrib/docker-flink/docker-compose.yml ---
@@ -20,16 +20,20 @@ version: "2"
 services:
   jobmanager:
 image: flink
+container_name: "jobmanager"
+expose:
+  - "6123"
 ports:
   - "48081:8081"
 command: jobmanager
-volumes:
-  - /opt/flink/conf
 
   taskmanager:
 image: flink
+expose:
+  - "6121"
+  - "6122"
 depends_on:
   - jobmanager
 command: taskmanager
-volumes_from:
-  - jobmanager:ro
+links:
--- End diff --

`links` are now a legacy feature of Docker 1.9.0 but probably fine to stick 
with it for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4864) Shade Calcite dependency in flink-table

2016-10-20 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4864:


 Summary: Shade Calcite dependency in flink-table
 Key: FLINK-4864
 URL: https://issues.apache.org/jira/browse/FLINK-4864
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Affects Versions: 1.2.0
Reporter: Fabian Hueske


The Table API has a dependency on Apache Calcite.
A user reported to have version conflicts when having a own Calcite dependency 
in the classpath.

The solution would be to shade away the Calcite dependency (Calcite's 
transitive dependencies are already shaded).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15591644#comment-15591644
 ] 

Thomas FOURNIER commented on FLINK-4850:


Ok I'm creating a specific JIRA issue related to adding EvaluateDataSet 
operation for LabeledVector.

> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>Assignee: Theodore Vasiloudis
>
> It seems that evaluate operation is defined for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML when using SVM.
> We need to update the documentation as follows:
> val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4850) FlinkML - SVM predict Operation for Vector and not LaveledVector

2016-10-20 Thread Thomas FOURNIER (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas FOURNIER updated FLINK-4850:
---
Description: 
It seems that evaluate operation is defined for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML when using SVM.

We need to update the documentation as follows:

val astroTest:DataSet[(Vector,Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val predictionPairs = svm.evaluate(astroTest)




  was:
It seems that evaluate operation is defined for Vector and not LabeledVector.
It impacts QuickStart guide for FlinkML when using SVM.

1- We need to update the documentation as follows:

 val astroTest:DataSet[(Vector,Double)] = MLUtils
  .readLibSVM(env, "src/main/resources/svmguide1.t")
  .map(l => (l.vector, l.label))

val predictionPairs = svm.evaluate(astroTest)


2- Update code such that LabeledVector can be used with evaluate method









> FlinkML - SVM predict Operation for Vector and not LaveledVector
> 
>
> Key: FLINK-4850
> URL: https://issues.apache.org/jira/browse/FLINK-4850
> Project: Flink
>  Issue Type: Bug
>Reporter: Thomas FOURNIER
>Assignee: Theodore Vasiloudis
>
> It seems that evaluate operation is defined for Vector and not LabeledVector.
> It impacts QuickStart guide for FlinkML when using SVM.
> We need to update the documentation as follows:
> val astroTest:DataSet[(Vector,Double)] = MLUtils
>   .readLibSVM(env, "src/main/resources/svmguide1.t")
>   .map(l => (l.vector, l.label))
> val predictionPairs = svm.evaluate(astroTest)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >