[jira] [Created] (FLINK-2670) Instable Test S3FileSystemTest

2015-09-15 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created FLINK-2670:
--

 Summary: Instable Test S3FileSystemTest
 Key: FLINK-2670
 URL: https://issues.apache.org/jira/browse/FLINK-2670
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Matthias J. Sax
Priority: Critical


Fails with
{noformat}
==
Maven produced no output for 300 seconds.
==
{noformat}

https://travis-ci.org/apache/flink/jobs/80344487



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2671) Instable Test StreamCheckpointNotifierITCase

2015-09-15 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created FLINK-2671:
--

 Summary: Instable Test StreamCheckpointNotifierITCase
 Key: FLINK-2671
 URL: https://issues.apache.org/jira/browse/FLINK-2671
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Matthias J. Sax
Priority: Critical


{noformat}
Failed tests: 
  
StreamCheckpointNotifierITCase>StreamFaultToleranceTestBase.runCheckpointedProgram:105->postSubmit:115
 No checkpoint notification was received.{noformat}

https://travis-ci.org/apache/flink/jobs/80344489



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14744974#comment-14744974
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140301180
  
Hi, I cleand-up all old commits, and put a new commit on top introducing 
`SourceFunction.stop()` and unblock stop signal using an own thread. Please 
give feedback.

Btw: Travis fails due to unstable test. My own Travis is green: 
https://travis-ci.org/mjsax/flink/builds/80344479


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Getter for wrapped StreamExecutionEnvironment ...

2015-09-15 Thread lofifnc
Github user lofifnc commented on the pull request:

https://github.com/apache/flink/pull/1120#issuecomment-140301283
  
Hey,

This is mostly for testing purposes. I'm working on some Tooling, which 
takes a  `StreamExectutionEnvironment` and performs integration Tests. In the 
current state I'm not able to work with a `StreamExecutionEnvironment` defined 
with the Scala api.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-15 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140301180
  
Hi, I cleand-up all old commits, and put a new commit on top introducing 
`SourceFunction.stop()` and unblock stop signal using an own thread. Please 
give feedback.

Btw: Travis fails due to unstable test. My own Travis is green: 
https://travis-ci.org/mjsax/flink/builds/80344479


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

2015-09-15 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140301378
  
Storm only supports one global configuration that is shared over all 
spout/bolts. So `GlobalJobParameter` will work just fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2525) Add configuration support in Storm-compatibility

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14744978#comment-14744978
 ] 

ASF GitHub Bot commented on FLINK-2525:
---

Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/1046#issuecomment-140301378
  
Storm only supports one global configuration that is shared over all 
spout/bolts. So `GlobalJobParameter` will work just fine.


> Add configuration support in Storm-compatibility
> 
>
> Key: FLINK-2525
> URL: https://issues.apache.org/jira/browse/FLINK-2525
> Project: Flink
>  Issue Type: New Feature
>  Components: Storm Compatibility
>Reporter: fangfengbin
>Assignee: fangfengbin
>
> Spouts and Bolt are initialized by a call to `Spout.open(...)` and 
> `Bolt.prepare()`, respectively. Both methods have a config `Map` as first 
> parameter. This map is currently not populated. Thus, Spouts and Bolts cannot 
> be configure with user defined parameters. In order to support this feature, 
> spout and bolt wrapper classes need to be extended to create a proper `Map` 
> object. Furthermore, the clients need to be extended to take a `Map`, 
> translate it into a Flink `Configuration` that is forwarded to the wrappers 
> for proper initialization of the map.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Hits

2015-09-15 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/765#issuecomment-140314739
  
Hi @mfahimazizi,

as I said, it is not possible to access the edge values inside the 
`VertexUpdateFunction`.
However, you can get the same functionality if you add the edge value 
inside the message that you create in the `MessagingFunction`. Alternatively, 
you can build your own delta iteration, instead of using the vertex-centric 
model.
If you're planning to finish this PR, then let us know and we can even sync 
on skype or so to help you out! Otherwise, please close this PR and hopefully 
someone else will pick up this issue.
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745054#comment-14745054
 ] 

Chiwan Park commented on FLINK-1745:


Hi [~danielblazevski], you can modify my implementation by pulling my issue 
branch (https://github.com/chiwanpark/flink/tree/FLINK-1745). After 
implemented, please open pull request for reviewing. Then I'll close my PR and 
review your PR.

If you have any question about my implementation, post the question to this 
thread. :-)

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745055#comment-14745055
 ] 

Chiwan Park commented on FLINK-1745:


[~till.rohrmann] Could you assign this issue to [~danielblazevski]? I can't 
find him in assignee list.

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2672) Add partitioned output format to HDFS RollingSink

2015-09-15 Thread Mohamed Amine ABDESSEMED (JIRA)
Mohamed Amine ABDESSEMED created FLINK-2672:
---

 Summary: Add partitioned output format to HDFS RollingSink
 Key: FLINK-2672
 URL: https://issues.apache.org/jira/browse/FLINK-2672
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Affects Versions: 0.10
Reporter: Mohamed Amine ABDESSEMED
Priority: Minor


An interesting use case of the HDFS Sink is to dispatch data into multiple 
directories depending of attributes present in the source data.
For example, for some data with a timestamp and a status fields, we want to 
write it into different directories using a pattern like : 
/somepath/%{timestamp}/%{status}

The expected results are somethings like: 
/somepath/some_timestamp/wellformed
/somepath/some_timestamp/malformed
/somepath/some_timestamp/incomplete 
... 
etc

To support this functionality the bucketing and checkpointing logics need to be 
changed. 

Note: For now, this can be done using the current version of the Rolling HDFS 
Sink with the help of splitting data streams and having multiple HDFS sinks.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2582) Document how to build Flink with other Scala versions

2015-09-15 Thread Chiwan Park (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chiwan Park resolved FLINK-2582.

Resolution: Fixed

> Document how to build Flink with other Scala versions
> -
>
> Key: FLINK-2582
> URL: https://issues.apache.org/jira/browse/FLINK-2582
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 0.10
>
>
> On can build Flink for different Scala versions.
> We should describe in the documentation how to do that, ideally next to 
> building for different Hadoop versions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2672) Add partitioned output format to HDFS RollingSink

2015-09-15 Thread Mohamed Amine ABDESSEMED (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohamed Amine ABDESSEMED updated FLINK-2672:

Description: 
An interesting use case of the HDFS Sink is to dispatch data into multiple 
directories depending of attributes present in the source data.
For example, for some data with a timestamp and a status fields, we want to 
write it into different directories using a pattern like : 
/somepath/%{timestamp}/%{status}

The expected results are somethings like: 
/somepath/some_timestamp/wellformed
/somepath/some_timestamp/malformed
/somepath/some_timestamp/incomplete 
... 
etc

To support this functionality the bucketing and checkpointing logics need to be 
changed. 

Note: For now, this can be done using the current version of the Rolling HDFS 
Sink (https://github.com/apache/flink/pull/1084) with the help of splitting 
data streams and having multiple HDFS sinks  



  was:
An interesting use case of the HDFS Sink is to dispatch data into multiple 
directories depending of attributes present in the source data.
For example, for some data with a timestamp and a status fields, we want to 
write it into different directories using a pattern like : 
/somepath/%{timestamp}/%{status}

The expected results are somethings like: 
/somepath/some_timestamp/wellformed
/somepath/some_timestamp/malformed
/somepath/some_timestamp/incomplete 
... 
etc

To support this functionality the bucketing and checkpointing logics need to be 
changed. 

Note: For now, this can be done using the current version of the Rolling HDFS 
Sink with the help of splitting data streams and having multiple HDFS sinks.



> Add partitioned output format to HDFS RollingSink
> -
>
> Key: FLINK-2672
> URL: https://issues.apache.org/jira/browse/FLINK-2672
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 0.10
>Reporter: Mohamed Amine ABDESSEMED
>Priority: Minor
>  Labels: features
>
> An interesting use case of the HDFS Sink is to dispatch data into multiple 
> directories depending of attributes present in the source data.
> For example, for some data with a timestamp and a status fields, we want to 
> write it into different directories using a pattern like : 
> /somepath/%{timestamp}/%{status}
> The expected results are somethings like: 
> /somepath/some_timestamp/wellformed
> /somepath/some_timestamp/malformed
> /somepath/some_timestamp/incomplete 
> ... 
> etc
> To support this functionality the bucketing and checkpointing logics need to 
> be changed. 
> Note: For now, this can be done using the current version of the Rolling HDFS 
> Sink (https://github.com/apache/flink/pull/1084) with the help of splitting 
> data streams and having multiple HDFS sinks  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745063#comment-14745063
 ] 

Till Rohrmann commented on FLINK-1745:
--

Done.

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-09-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-1745:
-
Assignee: Daniel Blazevski

> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745062#comment-14745062
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aminouvic commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140325179
  
Yeah you're right better have an operational version of the sink first, 
followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2583] Add Stream Sink For Rolling HDFS ...

2015-09-15 Thread aminouvic
Github user aminouvic commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140325179
  
Yeah you're right better have an operational version of the sink first, 
followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2665] [api-breaking] [runtime] Makes Ex...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1128#issuecomment-140325565
  
LGTM, will merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2665) ExecutionConfig is not serializable

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745066#comment-14745066
 ] 

ASF GitHub Bot commented on FLINK-2665:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1128#issuecomment-140325565
  
LGTM, will merge this


> ExecutionConfig is not serializable
> ---
>
> Key: FLINK-2665
> URL: https://issues.apache.org/jira/browse/FLINK-2665
> Project: Flink
>  Issue Type: Bug
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{ExecutionConfig}} is not serializable, because the 
> {{ExecutionConfig.Entry}} can contain non serializable key and value 
> field values.
> I suggest to add a type bound for {{K}} and {{V}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Getter for wrapped StreamExecutionEnvironment ...

2015-09-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1120#issuecomment-140326566
  
I think adding this accessor is fine.

Also, in case someone goes crazy and wants to mix the Java and Scala-style 
functions in one program ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

2015-09-15 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1127#issuecomment-140327645
  
Hi @twalthr, thanks for your contribution. But this PR contains many 
changes unrelated to HCatalog format. Maybe we should split this PR into 
HCatalog and other changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2167) Add fromHCat() to TableEnvironment

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745078#comment-14745078
 ] 

ASF GitHub Bot commented on FLINK-2167:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1127#issuecomment-140327645
  
Hi @twalthr, thanks for your contribution. But this PR contains many 
changes unrelated to HCatalog format. Maybe we should split this PR into 
HCatalog and other changes.


> Add fromHCat() to TableEnvironment
> --
>
> Key: FLINK-2167
> URL: https://issues.apache.org/jira/browse/FLINK-2167
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Minor
>  Labels: starter
>
> Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} 
> from an HCatalog table.
> The implementation could reuse Flink's HCatInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745084#comment-14745084
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488168
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

Shouldn't it be the other way round, i.e., use 
`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` if defined and `KEY_TASKM_MEM_SEGMENT_SIZE` 
otherwise?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488168
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

Shouldn't it be the other way round, i.e., use 
`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` if defined and `KEY_TASKM_MEM_SEGMENT_SIZE` 
otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745092#comment-14745092
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488510
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

typo +"a"


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488510
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

typo +"a"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745094#comment-14745094
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488711
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Add validity bounds 0 > x > 1 to message?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488711
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Add validity bounds 0 > x > 1 to message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745095#comment-14745095
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488877
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Are we sure that 1MB is enough for Netty?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488925
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` is the deprecated key. In the existing 
Flink code, it is overridden by the new default network buffer size key 
`KEY_TASKM_MEM_SEGMENT_SIZE`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488877
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Are we sure that 1MB is enough for Netty?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745096#comment-14745096
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488925
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$"
 
 # Define FLINK_JM_HEAP if it is not already set
 if [ -z "${FLINK_JM_HEAP}" ]; then
-FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
-FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}")
+FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
+FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
+if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
+FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig 
${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
+fi
+
+# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
+if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
+BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" 
"${YAML_CONF}")
+if [ "${BUFFER_SIZE}" -eq "0" ]; then
--- End diff --

`KEY_TASKM_MEM_NETWORK_BUFFER_SIZE` is the deprecated key. In the existing 
Flink code, it is overridden by the new default network buffer size key 
`KEY_TASKM_MEM_SEGMENT_SIZE`.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745098#comment-14745098
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488999
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
+exit 1
+fi
+# recalculate the JVM heap memory by taking the off-heap 
ratio into account
+TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< 
"${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+fi
+fi
+
+export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M 
-Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE * 1024 * 1024 + 
FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))"
--- End diff --

Append "M" to -XX:MaxDirectMemorySize parameter?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39488999
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
+exit 1
+fi
+# recalculate the JVM heap memory by taking the off-heap 
ratio into account
+TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< 
"${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+fi
+fi
+
+export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M 
-Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE * 1024 * 1024 + 
FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))"
--- End diff --

Append "M" to -XX:MaxDirectMemorySize parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489755
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

inconsistent code style, linebreak after closing `'}'`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489771
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

line break


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745117#comment-14745117
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489755
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

inconsistent code style, linebreak after closing `'}'`


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745118#comment-14745118
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39489771
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

line break


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2111) Add "stop" signal to cleanly shutdown streaming jobs

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745121#comment-14745121
 ] 

ASF GitHub Bot commented on FLINK-2111:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140335366
  
IMO, it would be better to wrap the `stopExecution` call in the 
`TaskManager` into a `Future`. This has the following reasons: First of all, 
with the current implementation, you'll miss all exceptions which occur in the 
`stop` method. Secondly, you will send a positive `TaskOperationResult` back 
before the stopping was executed. I haven't checked the semantic of the 
`TaskOperationResult` but it might be the case that the JM upon receiving this 
messages thinks that the stop call was successfully executed.


> Add "stop" signal to cleanly shutdown streaming jobs
> 
>
> Key: FLINK-2111
> URL: https://issues.apache.org/jira/browse/FLINK-2111
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime, JobManager, Local Runtime, 
> Streaming, TaskManager, Webfrontend
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> Currently, streaming jobs can only be stopped using "cancel" command, what is 
> a "hard" stop with no clean shutdown.
> The new introduced "stop" signal, will only affect streaming source tasks 
> such that the sources can stop emitting data and shutdown cleanly, resulting 
> in a clean shutdown of the whole streaming job.
> This feature is a pre-requirment for 
> https://issues.apache.org/jira/browse/FLINK-1929



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-15 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140335366
  
IMO, it would be better to wrap the `stopExecution` call in the 
`TaskManager` into a `Future`. This has the following reasons: First of all, 
with the current implementation, you'll miss all exceptions which occur in the 
`stop` method. Secondly, you will send a positive `TaskOperationResult` back 
before the stopping was executed. I haven't checked the semantic of the 
`TaskOperationResult` but it might be the case that the JM upon receiving this 
messages thinks that the stop call was successfully executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745131#comment-14745131
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39490684
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Shouldn't this be (HEAPSIZE / (1.0 - fraction)) - HEAPSIZE?


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39490684
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Shouldn't this be (HEAPSIZE / (1.0 - fraction)) - HEAPSIZE?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745132#comment-14745132
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140336732
  
Thanks for the PR @mxm.
I left a few comments inline. 


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140336732
  
Thanks for the PR @mxm.
I left a few comments inline. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492448
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Internal representation of a parameter passed to a user defined 
function.
+ */
+public class Option {
+
+   private String longName;
+   private String shortName;
+
+   private T defaultValue;
+   private Set choices;
+
+   private String helpText;
+
+   public Option(String name) {
+   this.longName = name;
+   this.choices = new HashSet<>();
+   }
+
+   /**
+* Define a alternative / short name of the Parameter.
+*
+* @param shortName - short version of the parameter name
+* @return the updated Option
+*/
+   public Option alt(String shortName) {
+   this.shortName = shortName;
+   return this;
+   }
+
+   /**
+* Define a default value for the option.
+*
+* Throws an exception if the list of possible values for the parameter 
is not empty and the default value passed
+* is not in the list.
+*
+* @param defaultValue - the default value
+* @return the updated Option
+*/
+   public Option defaultValue(T defaultValue) {
+   if (this.choices.isEmpty()) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   if (this.choices.contains(defaultValue)) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   throw new 
IllegalArgumentException("defaultValue passed is not in the list of expected 
values.");
+   }
+   }
+   }
+
+   /**
+* Restrict the list of possible values of the parameter.
+*
+* @param choices - the allowed values of the parameter.
+* @return the updated Option
+*/
+   public Option choices(T... choices) {
+   Collections.addAll(this.choices, choices);
--- End diff --

Add check that `defaultValue` (if already set) is in choices.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745161#comment-14745161
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492448
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/Option.java ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Internal representation of a parameter passed to a user defined 
function.
+ */
+public class Option {
+
+   private String longName;
+   private String shortName;
+
+   private T defaultValue;
+   private Set choices;
+
+   private String helpText;
+
+   public Option(String name) {
+   this.longName = name;
+   this.choices = new HashSet<>();
+   }
+
+   /**
+* Define a alternative / short name of the Parameter.
+*
+* @param shortName - short version of the parameter name
+* @return the updated Option
+*/
+   public Option alt(String shortName) {
+   this.shortName = shortName;
+   return this;
+   }
+
+   /**
+* Define a default value for the option.
+*
+* Throws an exception if the list of possible values for the parameter 
is not empty and the default value passed
+* is not in the list.
+*
+* @param defaultValue - the default value
+* @return the updated Option
+*/
+   public Option defaultValue(T defaultValue) {
+   if (this.choices.isEmpty()) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   if (this.choices.contains(defaultValue)) {
+   return this.setDefaultValue(defaultValue);
+   } else {
+   throw new 
IllegalArgumentException("defaultValue passed is not in the list of expected 
values.");
+   }
+   }
+   }
+
+   /**
+* Restrict the list of possible values of the parameter.
+*
+* @param choices - the allowed values of the parameter.
+* @return the updated Option
+*/
+   public Option choices(T... choices) {
+   Collections.addAll(this.choices, choices);
--- End diff --

Add check that `defaultValue` (if already set) is in choices.


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data sou

[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492715
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
--- End diff --

Check overwriting of existing option with same name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745167#comment-14745167
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39492715
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
--- End diff --

Check overwriting of existing option with same name?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493200
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Check alternative name as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745170#comment-14745170
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493200
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Check alternative name as well?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745171#comment-14745171
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493290
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
+   throw new RequiredParameterException("Required 
parameter " + o.getName() + " not present.");
+   }
+   }
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied. If 
not use the default values
+* which have been supplied. If no default value is supplied for a 
missing parameter, an exception is thrown.
+*
+* @param parameterTool - parameters supplied by the user.
+*/
+   public void checkAndPopulate(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   String key = o.getName();
--- End diff --

Check for alternative / short key as well


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "b

[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39493290
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
+   throw new RequiredParameterException("Required 
parameter " + o.getName() + " not present.");
+   }
+   }
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied. If 
not use the default values
+* which have been supplied. If no default value is supplied for a 
missing parameter, an exception is thrown.
+*
+* @param parameterTool - parameters supplied by the user.
+*/
+   public void checkAndPopulate(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   String key = o.getName();
--- End diff --

Check for alternative / short key as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493894
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Thanks, added a lower bound.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493902
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493898
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Yes, actually it needs much less. This is just some static initialization 
code Netty runs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493879
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Yes, actually it is (HEAPSIZE /  fraction) * (1.0 - fraction) :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140343995
  
Thanks for the PR @mliesenberg!
Can you add the checks (and tests to verify the checks are working) that I 
mentioned in my comments?
The original JIRA issue also included type checks. I think we could 
restrict those to Java primitives (Integer, Long, Double, Float, Boolean). 
I would be OK with adding this PR without type checks (and opening another 
JIRA for that) but it would be a cool feature if you'd like to add that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745181#comment-14745181
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493894
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
+
+if useOffHeapMemory; then
+if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
+# We split up the total memory in heap and off-heap memory
+if [[ "${FLINK_TM_HEAP}" -le 
"${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
+echo "[ERROR] Configured TaskManager memory size 
('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size 
('${KEY_TASKM_MEM_MANAGED_SIZE}')."
+exit 1
+fi
+TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
+TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
+else
+# We calculate the memory using a fraction of the total 
memory
+if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 
1.0"` != "0" ]]; then
+echo "[ERROR] Configured TaskManager managed memory 
fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' 
in ${FLINK_CONF_FILE}"
--- End diff --

Thanks, added a lower bound.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493933
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745183#comment-14745183
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493898
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
+NETTY_BUFFERS=$((1024 * 1024))
--- End diff --

Yes, actually it needs much less. This is just some static initialization 
code Netty runs.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745185#comment-14745185
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493918
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

Thanks.


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745180#comment-14745180
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493879
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Yes, actually it is (HEAPSIZE /  fraction) * (1.0 - fraction) :)


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745182#comment-14745182
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140343995
  
Thanks for the PR @mliesenberg!
Can you add the checks (and tests to verify the checks are working) that I 
mentioned in my comments?
The original JIRA issue also included type checks. I think we could 
restrict those to Java primitives (Integer, Long, Double, Float, Boolean). 
I would be OK with adding this PR without type checks (and opening another 
JIRA for that) but it would be a cool feature if you'd like to add that.


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745184#comment-14745184
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493902
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -51,13 +51,41 @@ if [[ $STARTSTOP == "start" ]]; then
 fi
 fi
 
-if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" 
-lt "0" ]]; then
+echo "[ERROR] Configured TaskManager JVM heap size is not a 
number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
 exit 1
 fi
 
-if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m 
-Xmx"$FLINK_TM_HEAP"m"
+if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+
+TM_HEAP_SIZE=${FLINK_TM_HEAP}
+TM_OFFHEAP_SIZE=0
+# some space for Netty initilization
--- End diff --

Thanks


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493918
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
--- End diff --

Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745186#comment-14745186
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39493933
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
 
-  relativeMemSize
+LOG.info(s"Using $fraction of the maximum memory size for " +
+  s"Flink managed off-heap memory (${directMemorySize >> 20} MB).")
+
+directMemorySize
+  } else {
--- End diff --

Thanks


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39494490
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Never mind, it's (HEAPSIZE / (1.0 - fraction)) * fraction..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745193#comment-14745193
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/1129#discussion_r39494490
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1586,32 +1586,29 @@ object TaskManager {

ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
"MemoryManager fraction of the free memory must 
be between 0.0 and 1.0")
 
-  val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
-fraction).toLong
+  if (memType == MemoryType.HEAP) {
 
-  LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
-s" heap memory (${relativeMemSize >> 20} MB).")
+val relativeMemSize = 
(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() *
+  fraction).toLong
 
-  relativeMemSize
-}
-else {
-  val ratio = configuration.getFloat(
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO)
-  
-  checkConfigParameter(ratio > 0.0f,
-ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY,
-"MemoryManager ratio (off-heap memory / heap size) must be larger 
than zero")
-  
-  val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory()
-  val relativeMemSize = (maxHeapSize * ratio).toLong
+LOG.info(s"Using $fraction of the currently free heap space for 
Flink managed " +
+  s" heap memory (${relativeMemSize >> 20} MB).")
+
+relativeMemSize
+  } else if (memType == MemoryType.OFF_HEAP) {
 
-  LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) 
for Flink " +
-s"managed off-heap memory (${relativeMemSize >> 20} MB).")
+// The maximum heap memory has been adjusted according to the 
fraction
+val directMemorySize = 
(EnvironmentInformation.getMaxJvmHeapMemory() / fraction).toLong
--- End diff --

Never mind, it's (HEAPSIZE / (1.0 - fraction)) * fraction..


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread mliesenberg
Github user mliesenberg commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39495024
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

That would only be necessary and applicable, if the check fails, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745203#comment-14745203
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user mliesenberg commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39495024
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

That would only be necessary and applicable, if the check fails, right?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread mliesenberg
Github user mliesenberg commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140347682
  
@fhueske Yes, I will add the checks.

Regarding the type checks: Does that refer to the interaction with the 
ParameterTool class? I thought about that as well, but there the values are 
stored as strings and only upon retrieval the expected types are used. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2640) Integrate the off-heap configurations with YARN runner

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745210#comment-14745210
 ] 

ASF GitHub Bot commented on FLINK-2640:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1132

[FLINK-2640][yarn] integrate off-heap configuration

This is based on #1129.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink off_heap_config-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1132


commit 917aad9a7d4b03265ce1dde6c23141383d6a9cf6
Author: Maximilian Michels 
Date:   2015-09-15T09:04:34Z

[FLINK-2640][yarn] integrate off-heap configuration




> Integrate the off-heap configurations with YARN runner
> --
>
> Key: FLINK-2640
> URL: https://issues.apache.org/jira/browse/FLINK-2640
> Project: Flink
>  Issue Type: New Feature
>  Components: YARN Client
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The YARN runner needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39495292
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Both names full and short should be treated equally, no?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2640][yarn] integrate off-heap configur...

2015-09-15 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/1132

[FLINK-2640][yarn] integrate off-heap configuration

This is based on #1129.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink off_heap_config-yarn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1132


commit 917aad9a7d4b03265ce1dde6c23141383d6a9cf6
Author: Maximilian Michels 
Date:   2015-09-15T09:04:34Z

[FLINK-2640][yarn] integrate off-heap configuration




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745211#comment-14745211
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39495292
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Both names full and short should be treated equally, no?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745208#comment-14745208
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user mliesenberg commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140347682
  
@fhueske Yes, I will add the checks.

Regarding the type checks: Does that refer to the interaction with the 
ParameterTool class? I thought about that as well, but there the values are 
stored as strings and only upon retrieval the expected types are used. 


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140347951
  
If you add a type field (or enum) to the option, you can check if you can 
cast the string into the requested type. Since we would only support a fixed 
set of types (the primitives I listed above) this should be quite easy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745212#comment-14745212
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140347951
  
If you add a type field (or enum) to the option, you can check if you can 
cast the string into the requested type. Since we would only support a fixed 
set of types (the primitives I listed above) this should be quite easy.


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745215#comment-14745215
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user mliesenberg commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140348478
  
Ok, I will add that. 


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread mliesenberg
Github user mliesenberg commented on the pull request:

https://github.com/apache/flink/pull/1097#issuecomment-140348478
  
Ok, I will add that. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745222#comment-14745222
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user mliesenberg commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39496043
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Mhm. Makes sense. :)

So in the end, the user can retrieve the parameter both with the short and 
the long name?
So it basically needs to be added twice if there is a short and a long name?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message 

[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread mliesenberg
Github user mliesenberg commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39496043
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Mhm. Makes sense. :)

So in the end, the user can retrieve the parameter both with the short and 
the long name?
So it basically needs to be added twice if there is a short and a long name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2641) Integrate the off-heap memory configuration with the TaskManager start script

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745227#comment-14745227
 ] 

ASF GitHub Bot commented on FLINK-2641:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140349703
  
Thanks for the update.
Good to merge, IMO


> Integrate the off-heap memory configuration with the TaskManager start script
> -
>
> Key: FLINK-2641
> URL: https://issues.apache.org/jira/browse/FLINK-2641
> Project: Flink
>  Issue Type: New Feature
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> The TaskManager start script needs to adjust the {{-Xmx}}, {{-Xms}}, and 
> {{-XX:MaxDirectMemorySize}} parameters according to the off-heap memory 
> settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2641] integrate off-heap memory configu...

2015-09-15 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1129#issuecomment-140349703
  
Thanks for the update.
Good to merge, IMO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39496768
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

The RequiredParameter tool should check for each Option that:
- the ParameterTool contains a value for either the full or short key (if 
short key is defined)
- the ParameterTool does not contain values for both full and short key (if 
short key is defined)
- if the value is set on the short key, it should be copied to the full key

The checkAndPopulate method should only add the default value to the full 
key

That way, the value can be retrieved using the full key.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745233#comment-14745233
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39496768
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

The RequiredParameter tool should check for each Option that:
- the ParameterTool contains a value for either the full or short key (if 
short key is defined)
- the ParameterTool does not contain values for both full and short key (if 
short key is defined)
- if the value is set on the short key, it should be copied to the full key

The checkAndPopulate method should only add the default value to the full 
key

That way, the value can be retrieved using the full key.

What do you think?


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>  

[GitHub] flink pull request: [FLINK-2017] Add predefined required parameter...

2015-09-15 Thread mliesenberg
Github user mliesenberg commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39497481
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Sounds good to me. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2017) Add predefined required parameters to ParameterTool

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745245#comment-14745245
 ] 

ASF GitHub Bot commented on FLINK-2017:
---

Github user mliesenberg commented on a diff in the pull request:

https://github.com/apache/flink/pull/1097#discussion_r39497481
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameter.java 
---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Facility to manage required parameters in user defined functions.
+ */
+public class RequiredParameter {
+
+   private static final String HELP_TEXT_PARAM_DELIMITER = "\t";
+   private static final String HELP_TEXT_LINE_DELIMITER = "\n";
+
+   private HashMap data;
+
+   public RequiredParameter() {
+   this.data = new HashMap<>();
+   }
+
+   public void add(Option option) {
+   this.data.put(option.getName(), option);
+   }
+
+   /**
+* Check if all parameters defined as required have been supplied.
+*
+* @param parameterTool - parameters supplied by user.
+*/
+   public void check(ParameterTool parameterTool) throws 
RequiredParameterException {
+   for (Option o : data.values()) {
+   // if the parameter is not present or its value is 
undefined, throw a RuntimeException.
+   if (!parameterTool.data.containsKey(o.getName()) || 
keyIsUndefined(o.getName(), parameterTool.data)) {
--- End diff --

Sounds good to me. 


> Add predefined required parameters to ParameterTool
> ---
>
> Key: FLINK-2017
> URL: https://issues.apache.org/jira/browse/FLINK-2017
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 0.9
>Reporter: Robert Metzger
>  Labels: starter
>
> In FLINK-1525 we've added the {{ParameterTool}}.
> During the PR review, there was a request for required parameters.
> This issue is about implementing a facility to define required parameters. 
> The tool should also be able to print a help menu with a list of all 
> parameters.
> This test case shows my initial ideas how to design the API
> {code}
>   @Test
>   public void requiredParameters() {
>   RequiredParameters required = new RequiredParameters();
>   Option input = required.add("input").alt("i").help("Path to 
> input file or directory"); // parameter with long and short variant
>   required.add("output"); // parameter only with long variant
>   Option parallelism = 
> required.add("parallelism").alt("p").type(Integer.class); // parameter with 
> type
>   Option spOption = 
> required.add("sourceParallelism").alt("sp").defaultValue(12).help("Number 
> specifying the number of parallel data source instances"); // parameter with 
> default value, specifying the type.
>   Option executionType = 
> required.add("executionType").alt("et").defaultValue("pipelined").choices("pipelined",
>  "batch");
>   ParameterUtil parameter = ParameterUtil.fromArgs(new 
> String[]{"-i", "someinput", "--output", "someout", "-p", "15"});
>   required.check(parameter);
>   required.printHelp();
>   required.checkAndPopulate(parameter);
>   String inputString = input.get();
>   int par = parallelism.getInteger();
>   String output = parameter.get("output");
>   int sourcePar = parameter.getInteger(spOption.getName());
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

2015-09-15 Thread twalthr
Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1127#issuecomment-140355813
  
Hi @chiwanpark, yes I think splitting it up makes sense. I just opened this 
PR to get some feedback and to show why my changes are necessary to integrate 
new input formats like HCatalog. You can ignore the `HCatTableSource` class as 
it is untested yet anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2167) Add fromHCat() to TableEnvironment

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745260#comment-14745260
 ] 

ASF GitHub Bot commented on FLINK-2167:
---

Github user twalthr commented on the pull request:

https://github.com/apache/flink/pull/1127#issuecomment-140355813
  
Hi @chiwanpark, yes I think splitting it up makes sense. I just opened this 
PR to get some feedback and to show why my changes are necessary to integrate 
new input formats like HCatalog. You can ignore the `HCatTableSource` class as 
it is untested yet anyway.


> Add fromHCat() to TableEnvironment
> --
>
> Key: FLINK-2167
> URL: https://issues.apache.org/jira/browse/FLINK-2167
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Minor
>  Labels: starter
>
> Add a {{fromHCat()}} method to the {{TableEnvironment}} to read a {{Table}} 
> from an HCatalog table.
> The implementation could reuse Flink's HCatInputFormat.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK1919] add HCatOutputFormat

2015-09-15 Thread jamescao
Github user jamescao commented on the pull request:

https://github.com/apache/flink/pull/1079#issuecomment-140359144
  
@chiwanpark 
pr is now updated, I pull out code related to HCatInputFormat and 
incorporated Flink-2555 and Flink-2617. I also change the test environment to 
from `CollectionsEnviroment` to  `LocalEnviroment` to cover the test of the 
de-serialization for the HCatOutputFormat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2667) Rework configuration parameters

2015-09-15 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745287#comment-14745287
 ] 

Maximilian Michels commented on FLINK-2667:
---

You're right. That's a shortcoming. Let's add the {{mb}} suffix to all memory 
parameters.

{quote}
Why aren't we following a soft approach for the configuration parameter 
changes: in the next release (0.10) support both keys, and log a deprecation 
warning for each used old config parameter. In the next release (0.11), we 
completely remove the old config parameters?
{quote}

That might still leave the users in pain who skip one version, but much better 
than immediately dropping the old configuration parameters.



> Rework configuration parameters
> ---
>
> Key: FLINK-2667
> URL: https://issues.apache.org/jira/browse/FLINK-2667
> Project: Flink
>  Issue Type: Improvement
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Maximilian Michels
> Fix For: 0.10
>
>
> In the course of FLINK-2641, we came up with some changes to make the 
> configuration parameters more meaningful and self-explanatory. Some concerns 
> about backwards-compatibility were raised in the associated pull request: 
> https://github.com/apache/flink/pull/1125. That's why I decided to delay 
> those changes. 
> Here are the changes to the configuration which I would like to propose:
> {{taskmanager.memory.size}} --> {{taskmanager.memory.managed}}
> {{taskmanager.memory.fraction}} --> {{taskmanager.memory.managed.fraction}} 
> {{taskmanager.heap.mb}} --> {{taskmanager.memory}}
> (Change its meaning to combined JVM heap + off-heap memory)
> {{jobmanager.heap.mb}} --> {{jobmanager.memory}}
> {{taskmanager.network.numberOfBuffers}} --> {{taskmanager.network.memory}} 
> (Specify the network size in terms of space and not in terms of the number of 
> buffers)
> I think those changes would make configuration easier and improve the overall 
> user experience of Flink. The drawback is that it requires some users to 
> update their config files. I believe the negative impact will be very little 
> because only two of the changed parameters are present in the default config. 
> Adapting to these new parameters should be easy because they make a lot more 
> sense but I wanted to put this up for debate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2667) Rework configuration parameters

2015-09-15 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-2667:
-

Assignee: Maximilian Michels

> Rework configuration parameters
> ---
>
> Key: FLINK-2667
> URL: https://issues.apache.org/jira/browse/FLINK-2667
> Project: Flink
>  Issue Type: Improvement
>  Components: Start-Stop Scripts
>Affects Versions: 0.10
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 0.10
>
>
> In the course of FLINK-2641, we came up with some changes to make the 
> configuration parameters more meaningful and self-explanatory. Some concerns 
> about backwards-compatibility were raised in the associated pull request: 
> https://github.com/apache/flink/pull/1125. That's why I decided to delay 
> those changes. 
> Here are the changes to the configuration which I would like to propose:
> {{taskmanager.memory.size}} --> {{taskmanager.memory.managed}}
> {{taskmanager.memory.fraction}} --> {{taskmanager.memory.managed.fraction}} 
> {{taskmanager.heap.mb}} --> {{taskmanager.memory}}
> (Change its meaning to combined JVM heap + off-heap memory)
> {{jobmanager.heap.mb}} --> {{jobmanager.memory}}
> {{taskmanager.network.numberOfBuffers}} --> {{taskmanager.network.memory}} 
> (Specify the network size in terms of space and not in terms of the number of 
> buffers)
> I think those changes would make configuration easier and improve the overall 
> user experience of Flink. The drawback is that it requires some users to 
> update their config files. I believe the negative impact will be very little 
> because only two of the changed parameters are present in the default config. 
> Adapting to these new parameters should be easy because they make a lot more 
> sense but I wanted to put this up for debate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2673) Scala API does not support Option type as key

2015-09-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2673:


 Summary: Scala API does not support Option type as key
 Key: FLINK-2673
 URL: https://issues.apache.org/jira/browse/FLINK-2673
 Project: Flink
  Issue Type: Wish
  Components: Scala API
Reporter: Till Rohrmann
Priority: Minor


The Scala API does not support the {{Option}} type as a key. It could be useful 
to allow grouping on a field with this type.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2674) Rework windowing logic

2015-09-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2674:
---

 Summary: Rework windowing logic
 Key: FLINK-2674
 URL: https://issues.apache.org/jira/browse/FLINK-2674
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Priority: Critical
 Fix For: 0.10


The windowing logic needs a major overhaul. This follows the design documents: 
  - https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams
  - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=60624830

Specifically, the following shortcomings need to be addressed:

  - Global parallel windows should be dropped
   -> for time, local windows are aligned and serve the same purpose
   -> there is currently no known robust and efficient parallel 
implementation of custom strategies 

  - Event time and out of order arrival needs to be supported

  - Eviction of not accessed keys does not work. Non-accessed keys linger 
infinitely

  - Performance is currently bad for time windows, due to a overly general 
implementation

  - Resources are leaking, threads are not shut down

  - Elements are stored multiple times (discretizers, window buffers)

  - Finally, many implementations are buggy, produce wrong results



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2675) Add utilities for scheduled triggers

2015-09-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2675:
---

 Summary: Add utilities for scheduled triggers
 Key: FLINK-2675
 URL: https://issues.apache.org/jira/browse/FLINK-2675
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 0.10


These utilities help schedule triggers for the future, ensure non-concurrent 
trigger execution, and proper trigger shutdown and release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2668) ProjectOperator method to close projection

2015-09-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-2668:
--
Description: 
I have come across an issue in my code where I called project(...) on a 
{{DataSet}} which was already a {{ProjectOperator}}. Instead of reducing the 
number of fields from 2 to 1 this instead increased the number of fields from 2 
to 3 resulting in 
{{org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: 
Tuple arity '3' expected but was '1'.}} when processing the next operator.

This can be resolved by adding an optional explicit call to conclude the 
projection, perhaps {{ProjectOperator.closeProjection()}}. Can this be done 
without creating a new no-op operator?

  was:
I have come across an issue in my code where I called project(...) on a DataSet 
which was already a ProjectOperator. Instead of reducing the number of fields 
from 2 to 1 this instead increased the number of fields from 2 to 3 resulting 
in org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: 
Tuple arity '3' expected but was '1'. when processing the next operator.

This can be resolved by adding an optional explicit call to conclude the 
projection, perhaps ProjectOperator.closeProjection(). Can this be done without 
creating a new no-op operator?


> ProjectOperator method to close projection
> --
>
> Key: FLINK-2668
> URL: https://issues.apache.org/jira/browse/FLINK-2668
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Affects Versions: master
>Reporter: Greg Hogan
>Priority: Minor
>
> I have come across an issue in my code where I called project(...) on a 
> {{DataSet}} which was already a {{ProjectOperator}}. Instead of reducing the 
> number of fields from 2 to 1 this instead increased the number of fields from 
> 2 to 3 resulting in 
> {{org.apache.flink.api.common.functions.InvalidTypesException: Input 
> mismatch: Tuple arity '3' expected but was '1'.}} when processing the next 
> operator.
> This can be resolved by adding an optional explicit call to conclude the 
> projection, perhaps {{ProjectOperator.closeProjection()}}. Can this be done 
> without creating a new no-op operator?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2676) Add abstraction for keyed window state

2015-09-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2676:
---

 Summary: Add abstraction for keyed window state
 Key: FLINK-2676
 URL: https://issues.apache.org/jira/browse/FLINK-2676
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 0.10


This abstraction should help to seamlessly switch between window state kept as
  - Heap-resident maps
  - Managed memory spillable maps
  - key/value state backend state

I would approach this abstraction once we implemented a few window operators 
and see what operations we need, such as
  - Drop time-regions across all keys
  - Append to state for key
  - Update/replace state by key
  - Iterate over unions of multiple state time regions
  - snapshot time regions completely / incrementally
  - (possibly more)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2677) Add a general-purpose keyed-window operator

2015-09-15 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2677:
---

 Summary: Add a general-purpose keyed-window operator
 Key: FLINK-2677
 URL: https://issues.apache.org/jira/browse/FLINK-2677
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
 Fix For: 0.10






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2677) Add a general-purpose keyed-window operator

2015-09-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-2677:

Assignee: Aljoscha Krettek

> Add a general-purpose keyed-window operator
> ---
>
> Key: FLINK-2677
> URL: https://issues.apache.org/jira/browse/FLINK-2677
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >