[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457726#comment-15457726
 ] 

ASF GitHub Bot commented on FLINK-4439:
---

Github user gheo21 commented on the issue:

https://github.com/apache/flink/pull/2397
  
Hi @rmetzger,
Thanks for taking a look. Do I need to do anything else to get it merged? 



> Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
> --
>
> Key: FLINK-4439
> URL: https://issues.apache.org/jira/browse/FLINK-4439
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.3
>Reporter: Gheorghe Gheorghe
>Priority: Minor
>
> The "flink-connector-kafka-0.8_2"  is logging the following error when all 
> 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. 
> See stacktrace: 
> {code:title=stacktrace|borderStyle=solid}
> 2016-08-21 15:22:30 WARN  FlinkKafkaConsumerBase:290 - Error communicating 
> with broker inexistentKafkHost:9092 to find partitions for [testTopic].class 
> java.nio.channels.ClosedChannelException. Message: null
> 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace
> java.nio.channels.ClosedChannelException
>   at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91)
>   at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131)
>   at MetricsFromKafka$.main(MetricsFromKafka.scala:38)
>   at MetricsFromKafka.main(MetricsFromKafka.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at sbt.Run.invokeMain(Run.scala:67)
>   at sbt.Run.run0(Run.scala:61)
>   at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
>   at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
>   at sbt.Logger$$anon$4.apply(Logger.scala:84)
>   at sbt.TrapExit$App.run(TrapExit.scala:248)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> In the above stackrace it is hard to figure out that the actual servers 
> provided as a config cannot be resolved to a valid ip address. Moreover the 
> flink kafka consumer will try all of those servers one by one and failing to 
> get partition information.
> The suggested improvement is to fail fast and announce the user that the 
> servers provided in the 'boostrap.servers' config are invalid. If at least 
> one server is valid then the exception should not be thrown. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...

2016-09-01 Thread gheo21
Github user gheo21 commented on the issue:

https://github.com/apache/flink/pull/2397
  
Hi @rmetzger,
Thanks for taking a look. Do I need to do anything else to get it merged? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4562) table example make an divided module in flink-examples

2016-09-01 Thread shijinkui (JIRA)
shijinkui created FLINK-4562:


 Summary: table example make an divided module in flink-examples
 Key: FLINK-4562
 URL: https://issues.apache.org/jira/browse/FLINK-4562
 Project: Flink
  Issue Type: Improvement
Reporter: shijinkui


example code should't packaged in table module.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4562) table examples make an divided module in flink-examples

2016-09-01 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-4562:
-
Summary: table examples make an divided module in flink-examples  (was: 
table example make an divided module in flink-examples)

> table examples make an divided module in flink-examples
> ---
>
> Key: FLINK-4562
> URL: https://issues.apache.org/jira/browse/FLINK-4562
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> example code should't packaged in table module.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4536 at 9/2/16 5:07 AM:


Hi [~skidder], just to clarify, the thread leak issue should be resolved after 
the fix on your InfluxDB Sink, and like the discussion above, should be 
independent of the Kinesis connector, correct?
The cached thread pool created by the Kinesis connector is also irrelevant to 
the AWS SDK and KCL version used, and didn't seem to be causing any thread 
leaks. So, I would suggest that this particular issue is resolved ;)

As for the {{ProvisionedThroughputExceeded}} failures:
The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded 
exceptions. This is an AWS service usage limitation 
(http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). 
The normal way to tune this is by setting 
{{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value 
(default is 0 interval, at most 1000ms should be sufficient for most cases) in 
the consumer's configuration properties, so that the consumer subtasks don't 
fetch records that frequently, and accordingly also increase 
{{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more 
records on each fetch.
If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens 
encountering {{ProvisionedThroughputExceeded}}, could you open a separate JIRA 
for this and we continue any discussion on this over there? As for the KCL, it 
should be irrelevant to the issues you were facing; we're only using a single 
utility class from it to deaggregate Kinesis records.



was (Author: tzulitai):
Hi [~skidder], just to clarify, the thread leak issue should be resolved after 
the fix on your InfluxDB Sink, and like the discussion above, should be 
independent of the Kinesis connector, correct?
The cached thread pool created by the Kinesis connector is also irrelevant to 
the AWS SDK and KCL version used, and didn't seem to be causing any thread 
leaks. So, I would suggest that this particular issue is resolved ;)

As for the {{ProvisionedThroughputExceeded}} failures:
The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded 
exceptions. This is an AWS service usage limitation 
(http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). 
The normal way to tune this is by setting 
{{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value 
(default is 0 interval, at most 1000ms should be sufficient for most cases) in 
the consumer's configuration properties, so that the consumer subtasks don't 
fetch records that frequently, and accordingly also increase 
{{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more 
records on each fetch.
If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens 
encountering the ProvisionedThroughputExceeded, could you open a separate JIRA 
for this and we continue any discussion on this over there? As for the KCL, it 
should be irrelevant to the issues you were facing; we're only using a single 
utillity class from it to deaggregate Kinesis records.


> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM thread

[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4536 at 9/2/16 5:07 AM:


Hi [~skidder], just to clarify, the thread leak issue should be resolved after 
the fix on your InfluxDB Sink, and like the discussion above, should be 
independent of the Kinesis connector, correct?
The cached thread pool created by the Kinesis connector is also irrelevant to 
the AWS SDK and KCL version used, and didn't seem to be causing any thread 
leaks. So, I would suggest that this particular issue is resolved ;)

As for the {{ProvisionedThroughputExceeded}} failures:
The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded 
exceptions. This is an AWS service usage limitation 
(http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). 
The normal way to tune this is by setting 
{{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value 
(default is 0 interval, at most 1000ms should be sufficient for most cases) in 
the consumer's configuration properties, so that the consumer subtasks don't 
fetch records that frequently, and accordingly also increase 
{{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more 
records on each fetch.
If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens 
encountering the ProvisionedThroughputExceeded, could you open a separate JIRA 
for this and we continue any discussion on this over there? As for the KCL, it 
should be irrelevant to the issues you were facing; we're only using a single 
utillity class from it to deaggregate Kinesis records.



was (Author: tzulitai):
Hi [~skidder], just to clarify, the thread leak issue should be resolved after 
the fix on your InfluxDB Sink, and like the discussion above, should be 
independent of the Kinesis connector, correct?
The cached thread pool created by the Kinesis connector is also irrelevant to 
the AWS SDK and KCL version used, and didn't seem to be causing any thread 
leaks. So, I would suggest that this particular issue is resolved ;)

As for the ProvisionedThroughputExceeded failures:
The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded 
exceptions. This is an AWS service usage limitation 
(http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). 
The normal way to tune this is by setting 
{{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value 
(default is 0 interval, at most 1000ms should be sufficient for most cases) in 
the consumer's configuration properties, so that the consumer subtasks don't 
fetch records that frequently, and accordingly also increase 
{{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more 
records on each fetch.
If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens 
encountering the ProvisionedThroughputExceeded, could you open a separate JIRA 
for this and we continue any discussion on this over there? As for the KCL, it 
should be irrelevant to the issues you were facing; we're only using a single 
utillity class from it to deaggregate Kinesis records.


> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads o

[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4536 at 9/2/16 4:58 AM:


Hi [~skidder], just to clarify, the thread leak issue should be resolved after 
the fix on your InfluxDB Sink, and like the discussion above, should be 
independent of the Kinesis connector, correct?
The cached thread pool created by the Kinesis connector is also irrelevant to 
the AWS SDK and KCL version used, and didn't seem to be causing any thread 
leaks. So, I would suggest that this particular issue is resolved ;)

As for the ProvisionedThroughputExceeded failures:
The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded 
exceptions. This is an AWS service usage limitation 
(http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). 
The normal way to tune this is by setting 
{{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value 
(default is 0 interval, at most 1000ms should be sufficient for most cases) in 
the consumer's configuration properties, so that the consumer subtasks don't 
fetch records that frequently, and accordingly also increase 
{{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more 
records on each fetch.
If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens 
encountering the ProvisionedThroughputExceeded, could you open a separate JIRA 
for this and we continue any discussion on this over there? As for the KCL, it 
should be irrelevant to the issues you were facing; we're only using a single 
utillity class from it to deaggregate Kinesis records.



was (Author: tzulitai):
Hi [~skidder], just to clarify, the thread leak issue should be resolved after 
the fix on your InfluxDB Sink, and like the discussion above, should be 
independent of the Kinesis connector, correct?
The cached thread pool created by the Kinesis connector is also irrelevant to 
the AWS SDK and KCL version used. So, I would suggest that this particular 
issue is resolved ;)

As for the ProvisionedThroughputExceeded failures:
The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded 
exceptions. This is an AWS service usage limitation 
(http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). 
The normal way to tune this is by setting 
{{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value 
(default is 0 interval, at most 1000ms should be sufficient for most cases) in 
the consumer's configuration properties, so that the consumer subtasks don't 
fetch records that frequently, and accordingly also increase 
{{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more 
records on each fetch.
If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens 
encountering the ProvisionedThroughputExceeded, could you open a separate JIRA 
for this and we continue any discussion on this over there? As for the KCL, it 
should be irrelevant to the issues you were facing; we're only using a single 
utillity class from it to deaggregate Kinesis records.


> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59

[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4536:


Hi [~skidder], just to clarify, the thread leak issue should be resolved after 
the fix on your InfluxDB Sink, and like the discussion above, should be 
independent of the Kinesis connector, correct?
The cached thread pool created by the Kinesis connector is also irrelevant to 
the AWS SDK and KCL version used. So, I would suggest that this particular 
issue is resolved ;)

As for the ProvisionedThroughputExceeded failures:
The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded 
exceptions. This is an AWS service usage limitation 
(http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). 
The normal way to tune this is by setting 
{{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value 
(default is 0 interval, at most 1000ms should be sufficient for most cases) in 
the consumer's configuration properties, so that the consumer subtasks don't 
fetch records that frequently, and accordingly also increase 
{{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more 
records on each fetch.
If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens 
encountering the ProvisionedThroughputExceeded, could you open a separate JIRA 
for this and we continue any discussion on this over there? As for the KCL, it 
should be irrelevant to the issues you were facing; we're only using a single 
utillity class from it to deaggregate Kinesis records.


> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> user: root
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.7.2
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager 

[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457364#comment-15457364
 ] 

ASF GitHub Bot commented on FLINK-4561:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2459

[FLINK-4561] replace all the scala version as a `scala.binary.version` 
property

Replace all the scala version(2.10) as a property `scala.binary.version` 
defined in root pom properties. default scala version property is 2.10.
modify:
1. dependency include scala version
2. module defining include scala version
3. scala version upgrade to 2.11.8 from 2.11.7

Only modify pom file.

The pull request references the related JIRA issue ("[FLINK-4561] replace 
all the scala version as a `scala.binary.version` property")

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink scala_version_property_replace

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2459


commit e8feaabcb4dc853c6e5b895c2351ea3d96351096
Author: shijinkui 
Date:   2016-09-02T03:14:07Z

[FLINK-4561] replace all the scala version as a `scala.binary.version` 
property
replace all the scala version(2.10) as a property `scala.binary.version` 
defined in root pom properties. default scala version property is 2.10.
modify:
1. dependency include scala version
2. module defining include scala version
3. scala version upgrade to 2.11.8 from 2.11.7




> replace all the scala version as a `scala.binary.version` property
> --
>
> Key: FLINK-4561
> URL: https://issues.apache.org/jira/browse/FLINK-4561
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> replace all the scala version(2.10) as a property `scala.binary.version` 
> defined in root pom properties. default scala version property is 2.10.
> modify:
> 1. dependency include scala version 
> 2. module defining include scala version
> 3. scala version upgrade to 2.11.8 from 2.11.7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2459: [FLINK-4561] replace all the scala version as a `s...

2016-09-01 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2459

[FLINK-4561] replace all the scala version as a `scala.binary.version` 
property

Replace all the scala version(2.10) as a property `scala.binary.version` 
defined in root pom properties. default scala version property is 2.10.
modify:
1. dependency include scala version
2. module defining include scala version
3. scala version upgrade to 2.11.8 from 2.11.7

Only modify pom file.

The pull request references the related JIRA issue ("[FLINK-4561] replace 
all the scala version as a `scala.binary.version` property")

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink scala_version_property_replace

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2459


commit e8feaabcb4dc853c6e5b895c2351ea3d96351096
Author: shijinkui 
Date:   2016-09-02T03:14:07Z

[FLINK-4561] replace all the scala version as a `scala.binary.version` 
property
replace all the scala version(2.10) as a property `scala.binary.version` 
defined in root pom properties. default scala version property is 2.10.
modify:
1. dependency include scala version
2. module defining include scala version
3. scala version upgrade to 2.11.8 from 2.11.7




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4561) replace all the scala version as a `scala.binary.version` property

2016-09-01 Thread shijinkui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shijinkui updated FLINK-4561:
-
Summary: replace all the scala version as a `scala.binary.version` property 
 (was: replace all the scala version as a property)

> replace all the scala version as a `scala.binary.version` property
> --
>
> Key: FLINK-4561
> URL: https://issues.apache.org/jira/browse/FLINK-4561
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> replace all the scala version(2.10) as a property `scala.binary.version` 
> defined in root pom properties. default scala version property is 2.10.
> modify:
> 1. dependency include scala version 
> 2. module defining include scala version
> 3. scala version upgrade to 2.11.8 from 2.11.7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4561) replace all the scala version as a property

2016-09-01 Thread shijinkui (JIRA)
shijinkui created FLINK-4561:


 Summary: replace all the scala version as a property
 Key: FLINK-4561
 URL: https://issues.apache.org/jira/browse/FLINK-4561
 Project: Flink
  Issue Type: Improvement
Reporter: shijinkui


replace all the scala version(2.10) as a property `scala.binary.version` 
defined in root pom properties. default scala version property is 2.10.

modify:
1. dependency include scala version 
2. module defining include scala version
3. scala version upgrade to 2.11.8 from 2.11.7



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4560) enforcer java version as 1.7

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457312#comment-15457312
 ] 

ASF GitHub Bot commented on FLINK-4560:
---

GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2458

[FLINK-4560] enforcer java version as 1.7

[FLINK-4560](https://issues.apache.org/jira/browse/FLINK-4560) enforcer 
java version as 1.7
1. maven-enforcer-plugin add java version enforce
2. maven-enforcer-plugin version upgrade to 1.4.1
explicit require java version

Only modify the root pom file.

The pull request references the related JIRA issue ("[FLINK-4560] enforcer 
java version as 1.7")


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink enforcer_java_version_7

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2458.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2458


commit d4fe8b21ea6680832328cc93b620973bee57ab03
Author: shijinkui 
Date:   2016-09-02T02:46:45Z

\
[FLINK-4560] enforcer java version as 1.7
1. maven-enforcer-plugin add java version enforce
2. maven-enforcer-plugin version upgrade to 1.4.1
explicit require java version




> enforcer java version as 1.7
> 
>
> Key: FLINK-4560
> URL: https://issues.apache.org/jira/browse/FLINK-4560
> Project: Flink
>  Issue Type: Improvement
>Reporter: shijinkui
>
> 1. maven-enforcer-plugin add java version enforce
> 2. maven-enforcer-plugin version upgrade to 1.4.1
> explicit require java version



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2458: [FLINK-4560] enforcer java version as 1.7

2016-09-01 Thread shijinkui
GitHub user shijinkui opened a pull request:

https://github.com/apache/flink/pull/2458

[FLINK-4560] enforcer java version as 1.7

[FLINK-4560](https://issues.apache.org/jira/browse/FLINK-4560) enforcer 
java version as 1.7
1. maven-enforcer-plugin add java version enforce
2. maven-enforcer-plugin version upgrade to 1.4.1
explicit require java version

Only modify the root pom file.

The pull request references the related JIRA issue ("[FLINK-4560] enforcer 
java version as 1.7")


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shijinkui/flink enforcer_java_version_7

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2458.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2458


commit d4fe8b21ea6680832328cc93b620973bee57ab03
Author: shijinkui 
Date:   2016-09-02T02:46:45Z

\
[FLINK-4560] enforcer java version as 1.7
1. maven-enforcer-plugin add java version enforce
2. maven-enforcer-plugin version upgrade to 1.4.1
explicit require java version




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4560) enforcer java version as 1.7

2016-09-01 Thread shijinkui (JIRA)
shijinkui created FLINK-4560:


 Summary: enforcer java version as 1.7
 Key: FLINK-4560
 URL: https://issues.apache.org/jira/browse/FLINK-4560
 Project: Flink
  Issue Type: Improvement
Reporter: shijinkui


1. maven-enforcer-plugin add java version enforce
2. maven-enforcer-plugin version upgrade to 1.4.1

explicit require java version



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456481#comment-15456481
 ] 

Scott Kidder commented on FLINK-4536:
-

On second thought, I'll leave the issue open and allow the Flink team to decide 
whether to update the version of the AWS SDK and KCL referenced in the Kinesis 
connector pom.xml.

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Current 
> user: root
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM: OpenJDK 
> 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Maximum heap 
> size: 2048 MiBytes
> 2016-08-30 14:52:50,541 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JAVA_HOME: 
> /usr/lib/jvm/java-1.8-openjdk/jre
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Hadoop 
> version: 2.7.2
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  JVM Options:
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:+UseG1GC
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xms2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - -Xmx2048M
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -XX:MaxDirectMemorySize=8388607T
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties
> 2016-08-30 14:52:50,543 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Program 
> Arguments:
> 2016-08-30 14:52:50,544 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> --configDir
> 2016-08

[jira] [Assigned] (FLINK-3719) WebInterface: Moving the barrier between graph and stats

2016-09-01 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-3719:
-

Assignee: Ivan Mushketyk

> WebInterface: Moving the barrier between graph and stats
> 
>
> Key: FLINK-3719
> URL: https://issues.apache.org/jira/browse/FLINK-3719
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Niels Basjes
>Assignee: Ivan Mushketyk
>
> It would be really useful if the separator between the graphical view of a 
> job topology at the top and the textual overview of the counters at the 
> bottom can be moved up/down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3719) WebInterface: Moving the barrier between graph and stats

2016-09-01 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456480#comment-15456480
 ] 

Ivan Mushketyk commented on FLINK-3719:
---

I'll implement this.

> WebInterface: Moving the barrier between graph and stats
> 
>
> Key: FLINK-3719
> URL: https://issues.apache.org/jira/browse/FLINK-3719
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Niels Basjes
>
> It would be really useful if the separator between the graphical view of a 
> job topology at the top and the textual overview of the counters at the 
> bottom can be moved up/down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456474#comment-15456474
 ] 

ASF GitHub Bot commented on FLINK-3030:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2448
  
@StephanEwen I've renamed the class as you suggested.
Regarding the CSS change, I don't know why these changes are added. I have 
the required (according to README.md) `gulp` version:
```
> $ gulp -version 
[21:06:35] CLI version 3.9.1
[21:06:35] Local version 3.9.1

```

I wonder if a `.styl` file was changed in one of the previous commits but 
it was not compiled to `.css`.


> Enhance Dashboard to show Execution Attempts
> 
>
> Key: FLINK-3030
> URL: https://issues.apache.org/jira/browse/FLINK-3030
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 0.10.0
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
> Fix For: 1.0.0
>
>
> Currently, the web dashboard shows only the latest execution attempt. We 
> should make all execution attempts and their accumulators available for 
> inspection.
> The REST monitoring API supports this, so it should be a change only to the 
> frontend part.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...

2016-09-01 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2448
  
@StephanEwen I've renamed the class as you suggested.
Regarding the CSS change, I don't know why these changes are added. I have 
the required (according to README.md) `gulp` version:
```
> $ gulp -version 
[21:06:35] CLI version 3.9.1
[21:06:35] Local version 3.9.1

```

I wonder if a `.styl` file was changed in one of the previous commits but 
it was not compiled to `.css`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456462#comment-15456462
 ] 

Scott Kidder edited comment on FLINK-4536 at 9/1/16 8:04 PM:
-

Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With 
your help I've been able to fix the Kinesis client disconnects & the thread 
leaks.

First, I implemented a {{close()}} function in the InfluxDBSink to ensure that 
the OkHttpClient used by the InfluxDB client is shutdown. The documentation for 
the OkHttp library suggests that this isn't necessary, but, hey, I'll do it 
anyways:
http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html


{code}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.okHttpClient = new OkHttpClient();
this.influxDB = 
InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), 
System.getenv("INFLUXDB_USERNAME"),
System.getenv("INFLUXDB_PASSWORD"), new 
OkClient(okHttpClient));
this.database = System.getenv("INFLUXDB_DATABASE");
this.retentionPolicy = System.getenv("INFLUXDB_RP");

// Flush every 2000 Points, at least every 100ms
this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
}

@Override
public void close() throws Exception {
if (this.okHttpClient != null) {

this.okHttpClient.getDispatcher().getExecutorService().shutdown();
this.okHttpClient.getConnectionPool().evictAll();
}
}
{code}

I continued to see Kinesis client disconnects and thread leaks, although they 
were happening at a slower rate. I updated the AWS SDK and KCL referenced by 
the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0:
https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff

Problem solved! There have been no disconnects for the last 2 hours; previously 
there were disconnects happening every 30 minutes or so. Also, the number of 
threads on the Task Managers has been constant. The following link includes 
charts covering the last 3 hours. The changes to the AWS SDK dependency was 
deployed at ~11:50
http://imgur.com/a/yDi4m

Thank you again for your help. I'll mark this issue as resolved.


was (Author: skidder):
Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With 
your help I've been able to fix the Kinesis client disconnects & the thread 
leaks.

First, I implemented a {code}code(){code} function in the InfluxDBSink to 
ensure that the {code}OkHttpClient{code} used by the InfluxDB client is 
shutdown. The documentation for the OkHttp library suggests that this isn't 
necessary, but, hey, I'll do it anyways:
http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html


{code}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.okHttpClient = new OkHttpClient();
this.influxDB = 
InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), 
System.getenv("INFLUXDB_USERNAME"),
System.getenv("INFLUXDB_PASSWORD"), new 
OkClient(okHttpClient));
this.database = System.getenv("INFLUXDB_DATABASE");
this.retentionPolicy = System.getenv("INFLUXDB_RP");

// Flush every 2000 Points, at least every 100ms
this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
}

@Override
public void close() throws Exception {
if (this.okHttpClient != null) {

this.okHttpClient.getDispatcher().getExecutorService().shutdown();
this.okHttpClient.getConnectionPool().evictAll();
}
}
{code}

I continued to see Kinesis client disconnects and thread leaks, although they 
were happening at a slower rate. I updated the AWS SDK and KCL referenced by 
the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0:
https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff

Problem solved! There have been no disconnects for the last 2 hours; previously 
there were disconnects happening every 30 minutes or so. Also, the number of 
threads on the Task Managers has been constant. The following link includes 
charts covering the last 3 hours. The changes to the AWS SDK dependency was 
deployed at ~11:50
http://imgur.com/a/yDi4m

Thank you again for your help. I'll mark this issue as resolved.

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
>

[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager

2016-09-01 Thread Scott Kidder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456462#comment-15456462
 ] 

Scott Kidder commented on FLINK-4536:
-

Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With 
your help I've been able to fix the Kinesis client disconnects & the thread 
leaks.

First, I implemented a {code}code(){code} function in the InfluxDBSink to 
ensure that the {code}OkHttpClient{code} used by the InfluxDB client is 
shutdown. The documentation for the OkHttp library suggests that this isn't 
necessary, but, hey, I'll do it anyways:
http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html


{code}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.okHttpClient = new OkHttpClient();
this.influxDB = 
InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), 
System.getenv("INFLUXDB_USERNAME"),
System.getenv("INFLUXDB_PASSWORD"), new 
OkClient(okHttpClient));
this.database = System.getenv("INFLUXDB_DATABASE");
this.retentionPolicy = System.getenv("INFLUXDB_RP");

// Flush every 2000 Points, at least every 100ms
this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS);
}

@Override
public void close() throws Exception {
if (this.okHttpClient != null) {

this.okHttpClient.getDispatcher().getExecutorService().shutdown();
this.okHttpClient.getConnectionPool().evictAll();
}
}
{code}

I continued to see Kinesis client disconnects and thread leaks, although they 
were happening at a slower rate. I updated the AWS SDK and KCL referenced by 
the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0:
https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff

Problem solved! There have been no disconnects for the last 2 hours; previously 
there were disconnects happening every 30 minutes or so. Also, the number of 
threads on the Task Managers has been constant. The following link includes 
charts covering the last 3 hours. The changes to the AWS SDK dependency was 
deployed at ~11:50
http://imgur.com/a/yDi4m

Thank you again for your help. I'll mark this issue as resolved.

> Possible thread leak in Task Manager
> 
>
> Key: FLINK-4536
> URL: https://issues.apache.org/jira/browse/FLINK-4536
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Scott Kidder
>
> Running Flink release 1.1.1 commit 61bfb36 in the following configuration:
> Job Manager
> 2 x Task Manager (2 CPU cores on each Task Manager)
> I've also updated the Kinesis source to use the latest AWS Java SDK, release 
> 1.11.29.
> I've got a single Flink application using all 4 slots. It consumes from a 
> Kinesis stream configured with 2 shards. I've limited the Kinesis source to a 
> parallelism of 2 as a workaround for FLINK-4341.
> Occasionally the Kinesis consumer fails because of provisioned-throughput 
> limits being hit. The application automatically restarts, and resumes 
> processing with the checkpoint stored on the Job Manager with no outward 
> indication of problems.
> I recently enabled the StatsD metrics reporter in Flink and noticed that the 
> number of threads running on each Task Manager increases by about 20 threads 
> each time the application restarts. Over the course of a day the application 
> might hit provisioned-throughput limits 20 times or so (this is not fully 
> production yet, so hitting these limits is acceptable for now). But the 
> number of threads continues to grow unbounded with no increase in workload on 
> the Task Managers.
> The following link includes charts for the overall Flink cluster performance 
> & Task Manager JVM threads over the course of 12 hours:
> http://imgur.com/a/K59hz
> Each decrease and subsequent spike in threads corresponds to the job being 
> restarted due to an AWS Kinesis source error.
> Here are the logs from one of the Task Manager instances on startup:
> {code}
> 2016-08-30 14:52:50,438 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  - 
> 
> 2016-08-30 14:52:50,540 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager  -  Starting 
> TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC)
> 2016-08-30 1

[jira] [Commented] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456392#comment-15456392
 ] 

ASF GitHub Bot commented on FLINK-3680:
---

GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2457

[FLINK-3680] Remove "(not set)" text in the Job Plan UI

This PR removes the "(not set)" text in the web frontend.

Currently it looks like this: 
https://issues.apache.org/jira/secure/attachment/12796006/Screen%20Shot%202016-03-29%20at%208.13.12%20PM.png

Now it looks like: 
![fixed 
labels](https://cloud.githubusercontent.com/assets/592286/18181550/815897c6-7083-11e6-8bbe-9825ac144c83.png)

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink improve-text

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2457.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2457


commit 67db4e7c03787645c45ef3bc95b45060892bb569
Author: Ivan Mushketyk 
Date:   2016-09-01T19:24:29Z

[FLINK-3680] Remove "(not set)" text in the Job Plan UI




> Remove or improve (not set) text in the Job Plan UI
> ---
>
> Key: FLINK-3680
> URL: https://issues.apache.org/jira/browse/FLINK-3680
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Jamie Grier
>Assignee: Ivan Mushketyk
> Attachments: Screen Shot 2016-03-29 at 8.12.17 PM.png, Screen Shot 
> 2016-03-29 at 8.13.12 PM.png
>
>
> When running streaming jobs the UI display (not set) in the UI in a few 
> different places.  This is not the case for batch jobs.
> To illustrate I've included screen shots of the UI for the batch and 
> streaming WordCount examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2457: [FLINK-3680] Remove "(not set)" text in the Job Pl...

2016-09-01 Thread mushketyk
GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2457

[FLINK-3680] Remove "(not set)" text in the Job Plan UI

This PR removes the "(not set)" text in the web frontend.

Currently it looks like this: 
https://issues.apache.org/jira/secure/attachment/12796006/Screen%20Shot%202016-03-29%20at%208.13.12%20PM.png

Now it looks like: 
![fixed 
labels](https://cloud.githubusercontent.com/assets/592286/18181550/815897c6-7083-11e6-8bbe-9825ac144c83.png)

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink improve-text

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2457.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2457


commit 67db4e7c03787645c45ef3bc95b45060892bb569
Author: Ivan Mushketyk 
Date:   2016-09-01T19:24:29Z

[FLINK-3680] Remove "(not set)" text in the Job Plan UI




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456319#comment-15456319
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
>
According to the design document, netty authentication is also part of this 
JIRA. Why was it not addressed?

The netty layer is addressed as part of web layer authentication (T2-3 & 
T2-5 combined). Do you see a need to address this to some other part of the 
code as well?


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-09-01 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2425
  
>
According to the design document, netty authentication is also part of this 
JIRA. Why was it not addressed?

The netty layer is addressed as part of web layer authentication (T2-3 & 
T2-5 combined). Do you see a need to address this to some other part of the 
code as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456282#comment-15456282
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77230955
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to persist cookie for the appID: {} in {} ", 
appId, path);
+   try {
+   File f = new File(path);
+   if(!f.exists()) {
+   f.createNewFile();
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (subNode.containsKey(cookieKey)) {
+   String errorMessage = "Secure Cookie is already 
found in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   subNode.addProperty(cookieKey, cookie);
+   config.save();
+   LOG.debug("Persisted cookie for the appID: {}", appId);
+   } catch(Exception e) {
+   LOG.error("Exception occurred while persisting app 
state for app id: {}. Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+   }
+
+   public static String getAppSecureCookie(String appId) {
+   if(appId == null) {
+   String errorMessage = "Application ID cannot be null";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+
+   String cookieFromFile;
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to fetch cookie for the appID: {} from {}", 
appId, path);
+
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (!subNode.containsKey(cookieKey)) {
+   String errorMessage = "Could  not find the app 
ID section in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   cookieFromFile = subNode.getString(cookieKey, "");
+   if(cookieFromFile.length() == 0) {
+   String errorMessage = "Could  not find cookie 
in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   } catch(Exception e) {
+   LOG.error("Exception occurred while fetching cookie for 
app id: {} Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+
+   LOG.debug("Found cookie for the appID: {}", appId);
+   return cookieFromFile;
+   }
+
+   public static void removeAppState(String appId) {
+   if(appId == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to remove the reference for the appId: {} from 
{}", appId, path);
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.warn(errorMessage);

[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread vijikarthi
Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77230955
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to persist cookie for the appID: {} in {} ", 
appId, path);
+   try {
+   File f = new File(path);
+   if(!f.exists()) {
+   f.createNewFile();
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (subNode.containsKey(cookieKey)) {
+   String errorMessage = "Secure Cookie is already 
found in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   subNode.addProperty(cookieKey, cookie);
+   config.save();
+   LOG.debug("Persisted cookie for the appID: {}", appId);
+   } catch(Exception e) {
+   LOG.error("Exception occurred while persisting app 
state for app id: {}. Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+   }
+
+   public static String getAppSecureCookie(String appId) {
+   if(appId == null) {
+   String errorMessage = "Application ID cannot be null";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+
+   String cookieFromFile;
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to fetch cookie for the appID: {} from {}", 
appId, path);
+
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (!subNode.containsKey(cookieKey)) {
+   String errorMessage = "Could  not find the app 
ID section in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   cookieFromFile = subNode.getString(cookieKey, "");
+   if(cookieFromFile.length() == 0) {
+   String errorMessage = "Could  not find cookie 
in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   } catch(Exception e) {
+   LOG.error("Exception occurred while fetching cookie for 
app id: {} Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+
+   LOG.debug("Found cookie for the appID: {}", appId);
+   return cookieFromFile;
+   }
+
+   public static void removeAppState(String appId) {
+   if(appId == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to remove the reference for the appId: {} from 
{}", appId, path);
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.warn(errorMessage);
+   return;
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   config.clearTree(appId);
+   config.save();
   

[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4559:
---
Priority: Minor  (was: Critical)

> Kinesis Producer not setting credentials provider properly when 
> AWS_CREDENTIALS_PROVIDER is "AUTO"
> --
>
> Key: FLINK-4559
> URL: https://issues.apache.org/jira/browse/FLINK-4559
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.1.3
>
>
> If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, 
> {{AWSUtils.getCredentialsProvider}} will return {{null}}, so 
> {{setCredentialsProvider}} should not be explicitly called on the internally 
> built {{KinesisProducerConfiguration}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456245#comment-15456245
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77227679
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class BlobClientSecureTest extends BlobClientTest {
+
+   /**
+* Starts the BLOB server with secure cookie enabled configuration
+*/
+   @BeforeClass
+   public static void startServer() {
+   try {
+   config.setBoolean(ConfigConstants.SECURITY_ENABLED, 
true);
+   config.setString(ConfigConstants.SECURITY_COOKIE, 
"foo");
+   BLOB_SERVER = new BlobServer(config);
+   }
+   catch (IOException e) {
+   e.printStackTrace();
+   fail(e.getMessage());
+   }
--- End diff --

Thanks. Will add the test case.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread vijikarthi
Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77227679
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class BlobClientSecureTest extends BlobClientTest {
+
+   /**
+* Starts the BLOB server with secure cookie enabled configuration
+*/
+   @BeforeClass
+   public static void startServer() {
+   try {
+   config.setBoolean(ConfigConstants.SECURITY_ENABLED, 
true);
+   config.setString(ConfigConstants.SECURITY_COOKIE, 
"foo");
+   BLOB_SERVER = new BlobServer(config);
+   }
+   catch (IOException e) {
+   e.printStackTrace();
+   fail(e.getMessage());
+   }
--- End diff --

Thanks. Will add the test case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456217#comment-15456217
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77226096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -426,4 +440,11 @@ void unregisterConnection(BlobServerConnection conn) {
}
}
 
+   /* Secure cookie to authenticate */
+   @Override
+   public String getSecureCookie() { return secureCookie; }
+
+   /* Flag to indicate if service level authentication is enabled or not */
+   public boolean isSecurityEnabled() { return secureCookie != null; }
--- End diff --

We expect the secure cookie configuration to be available if security is 
enabled. A missing value will be reported as an error ahead of time. Are you 
expecting any other conditions to be met? Could you please elaborate?


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread vijikarthi
Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77226096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -426,4 +440,11 @@ void unregisterConnection(BlobServerConnection conn) {
}
}
 
+   /* Secure cookie to authenticate */
+   @Override
+   public String getSecureCookie() { return secureCookie; }
+
+   /* Flag to indicate if service level authentication is enabled or not */
+   public boolean isSecurityEnabled() { return secureCookie != null; }
--- End diff --

We expect the secure cookie configuration to be available if security is 
enabled. A missing value will be reported as an error ahead of time. Are you 
expecting any other conditions to be met? Could you please elaborate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456204#comment-15456204
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3679:


+1 to fix the issue, the proposed changes seem reasonable and heads towards a 
better API. The Kinesis consumer will need to adapt to this change as well, as 
it also accepts DeserializationSchema.

> DeserializationSchema should handle zero or more outputs for every input
> 
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Kafka Connector
>Reporter: Jamie Grier
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456157#comment-15456157
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77221958
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -99,7 +110,43 @@ public void channelRead0(ChannelHandlerContext ctx, 
HttpObject msg) {
currentDecoder.destroy();
currentDecoder = null;
}
-   
+
+   if(secureCookie != null) {
--- End diff --

The secure cookie value could be auto-populated (Yarn) or user-provided but 
will be persisted in the in-memory Flink configuration instance which is passed 
to the web layer during bootstrap. Should the user decide to torn security off, 
then we expect the services to be restarted to reflect the change?


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread vijikarthi
Github user vijikarthi commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77221958
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java
 ---
@@ -99,7 +110,43 @@ public void channelRead0(ChannelHandlerContext ctx, 
HttpObject msg) {
currentDecoder.destroy();
currentDecoder = null;
}
-   
+
+   if(secureCookie != null) {
--- End diff --

The secure cookie value could be auto-populated (Yarn) or user-provided but 
will be persisted in the in-memory Flink configuration instance which is passed 
to the web layer during bootstrap. Should the user decide to torn security off, 
then we expect the services to be restarted to reflect the change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-09-01 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456065#comment-15456065
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

Reading the code I think I need to update the doc on how the memory pages can 
be allocated for each task. Will update the doc. Will do some tests before 
that. 

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FLINK-3322.docx
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-4559:
--

 Summary: Kinesis Producer not setting credentials provider 
properly when AWS_CREDENTIALS_PROVIDER is "AUTO"
 Key: FLINK-4559
 URL: https://issues.apache.org/jira/browse/FLINK-4559
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai
Priority: Critical


If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, 
{{AWSUtils.getCredentialsProvider}} will return {{null}}, so 
{{setCredentialsProvider}} should not be explicitly called on the internally 
built {{KinesisProducerConfiguration}}.





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456048#comment-15456048
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Thanks for the review @StephanEwen.

I agree that we've different style preferences concerning method 
parameters. I also agree that the one parameter per line is a little bit more 
verbose and requires more vertical scrolling. The best approach would probably 
be to reduce the number of method parameters by grouping them.


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Thanks for the review @StephanEwen.

I agree that we've different style preferences concerning method 
parameters. I also agree that the one parameter per line is a little bit more 
verbose and requires more vertical scrolling. The best approach would probably 
be to reduce the number of method parameters by grouping them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3947) Provide low level access to RocksDB state backend

2016-09-01 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456040#comment-15456040
 ] 

Elias Levy commented on FLINK-3947:
---

The use case us maintaining a large set of items as state for a keyed stream 
where items in the set may be added or removed incrementally as new messages 
are processed.  

There are a number of problems with the current RocksDB state backend 
implementation:

- It can handle aggregate state in a keyed stream that won't fit in memory as 
long as per key state fits in memory, but it can't handle per key state that 
won't fit in memory.

- Even when per key state fits in memory, it is extremely inefficient handling 
state that consists of a large set or list of items, where a small number items 
are added/removed to the set/list each time a message is processed.  The 
current implementation serializes the whole set/list as a single value, and 
thus deserializes/serializes it all every time a single value is added/removed.

An example of such state is a LRU cache for values extracted from a stream, 
where during each processElement invocation elements may be added and removed 
from the keyed state.

With low level access to RocksDB such an LRU could be easily implemented.  Set 
items would be serialized one per KV in RocksDB.  The key starting with the 
stream partition key, but also encoding the values timestamp.  Expiration is 
just a matter of iterating over the key range, deleting expired keys, and 
stopping after observing a value that should not be expired.  Alternatively, if 
the RocksDB TTL functionality were exposed, that could be used.

Even if low level access to RocksDB is not provided, Flink needs a other 
implementations of state management for container data types, where the data is 
stored and accessed more efficiently that serializing/deserializing the whole 
data structure.






> Provide low level access to RocksDB state backend
> -
>
> Key: FLINK-3947
> URL: https://issues.apache.org/jira/browse/FLINK-3947
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>
> The current state API is limiting and some implementations are not as 
> efficient as they could be, particularly when working with large states. For 
> instance, a ListState is append only.  You cannot remove values from the 
> list.  And the RocksDBListState get() implementation reads all list values 
> from RocksDB instead of returning an Iterable that only reads values as 
> needed.
> Furthermore, RocksDB is an ordered KV store, yet there is no ordered map 
> state API with an ability to iterate over the stored values in order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4559:
---
Affects Version/s: 1.1.2

> Kinesis Producer not setting credentials provider properly when 
> AWS_CREDENTIALS_PROVIDER is "AUTO"
> --
>
> Key: FLINK-4559
> URL: https://issues.apache.org/jira/browse/FLINK-4559
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.3
>
>
> If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, 
> {{AWSUtils.getCredentialsProvider}} will return {{null}}, so 
> {{setCredentialsProvider}} should not be explicitly called on the internally 
> built {{KinesisProducerConfiguration}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4559:
---
Fix Version/s: 1.1.3

> Kinesis Producer not setting credentials provider properly when 
> AWS_CREDENTIALS_PROVIDER is "AUTO"
> --
>
> Key: FLINK-4559
> URL: https://issues.apache.org/jira/browse/FLINK-4559
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.1.0, 1.1.1, 1.1.2
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.3
>
>
> If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, 
> {{AWSUtils.getCredentialsProvider}} will return {{null}}, so 
> {{setCredentialsProvider}} should not be explicitly called on the internally 
> built {{KinesisProducerConfiguration}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"

2016-09-01 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4559:
---
Affects Version/s: 1.1.1
   1.1.0

> Kinesis Producer not setting credentials provider properly when 
> AWS_CREDENTIALS_PROVIDER is "AUTO"
> --
>
> Key: FLINK-4559
> URL: https://issues.apache.org/jira/browse/FLINK-4559
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, 
> {{AWSUtils.getCredentialsProvider}} will return {{null}}, so 
> {{setCredentialsProvider}} should not be explicitly called on the internally 
> built {{KinesisProducerConfiguration}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by L...

2016-09-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2450#discussion_r77212855
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
 ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
 InetAddress.getByName("localhost").getHostAddress())
 
-  val configuration = generateConfiguration(userConfiguration)
+  protected val _configuration = generateConfiguration(userConfiguration)
--- End diff --

It's actually the original configuration which will usually have the job 
manager ipc port set to 0. Thus, in order to return a configuration which 
points to the current leader, we have to generate a new configuration via 
`configuration` which inserts the selected port. I'll rename the variable to 
`originalConfiguration`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455997#comment-15455997
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2450#discussion_r77212855
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
 ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
 InetAddress.getByName("localhost").getHostAddress())
 
-  val configuration = generateConfiguration(userConfiguration)
+  protected val _configuration = generateConfiguration(userConfiguration)
--- End diff --

It's actually the original configuration which will usually have the job 
manager ipc port set to 0. Thus, in order to return a configuration which 
points to the current leader, we have to generate a new configuration via 
`configuration` which inserts the selected port. I'll rename the variable to 
`originalConfiguration`. 


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

2016-09-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2449
  
I've addressed your comments @StephanEwen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455990#comment-15455990
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2449
  
I've addressed your comments @StephanEwen.


> Replace ActorGateways in NetworkEnvironment by interfaces
> -
>
> Key: FLINK-4455
> URL: https://issues.apache.org/jira/browse/FLINK-4455
> Project: Flink
>  Issue Type: Improvement
>  Components: Network, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world 
> ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the 
> dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement 
> these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4558) Add support for synchronizing streams

2016-09-01 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455966#comment-15455966
 ] 

Elias Levy commented on FLINK-4558:
---

It should be noted that Flink already perform a similar function in the 
handling of snapshot barriers, where it will pause a stream into an operator 
with multiple streams upon receiving a barrier, so as two align the two streams 
at the barrier to generate a snapshot.

> Add support for synchronizing streams
> -
>
> Key: FLINK-4558
> URL: https://issues.apache.org/jira/browse/FLINK-4558
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>
> As mentioned on the [mailing 
> list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html],
>  there are use cases that require synchronizing two streams on via their 
> times and where it is not practical to buffer all messages from one streams 
> while waiting for the other to synchronize.  Flink should add functionality 
> to enable such use cases.
> This could be implemented by modifying TwoInputStreamOperator so that calls 
> to processElement1 and processElement2 could return a value indicating that 
> the element can't yet be processed, having the framework then pause 
> processing for some time, potentially using exponential back off with a hard 
> maximum, and then allowing the back pressure system to do its work and pause 
> the stream.
> Alternatively, an API could be added to explicitly pause/unpause a stream.
> For ease of use either of these mechanism should be used to create a 
> SynchronizedTwoInputStreamOperator that end users can utilize by passing a 
> configurable time delta to use as a synchronization threshold.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4461) Ensure all the classes are tagged with suitable annotations

2016-09-01 Thread ramkrishna.s.vasudevan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ramkrishna.s.vasudevan updated FLINK-4461:
--
Attachment: FLINK_annotations.xlsx

Just attaching a list of public classes and interfaces that has no annotations 
attached with it. May be worth to fix atleast the actual public classes which 
are missing annotations.

> Ensure all the classes are tagged with suitable annotations
> ---
>
> Key: FLINK-4461
> URL: https://issues.apache.org/jira/browse/FLINK-4461
> Project: Flink
>  Issue Type: Improvement
>Reporter: ramkrishna.s.vasudevan
>Assignee: ramkrishna.s.vasudevan
> Attachments: FLINK_annotations.xlsx
>
>
> Currently in Flink we have three annotations
> Public
> PublicEvolving
> Internal.
> But some of the classes though they are public they are not tagged. It may be 
> even advanced features but still tagging them could help the user to know 
> which are public facing and which are Internal API/interfaces. 
> I just ran a sample util in streaming-java package and I got these
> {code}
> class org.apache.flink.streaming.runtime.operators.CheckpointCommitter
> class 
> org.apache.flink.streaming.api.functions.source.FileMonitoringFunction$WatchType
> interface org.apache.flink.streaming.api.functions.TimestampExtractor
> class 
> org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
> class org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet
> class org.apache.flink.streaming.api.windowing.triggers.TriggerResult
> class 
> org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
> class org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator
> class 
> org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink$ExactlyOnceState
> interface 
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
> class 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
> interface 
> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> class 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction
> interface 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet$MergeFunction
> class org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider
> class 
> org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema
> class org.apache.flink.streaming.api.functions.source.FileReadFunction
> class 
> org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
> class org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask
> class org.apache.flink.streaming.api.functions.source.FileMonitoringFunction
> class org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput
> class org.apache.flink.streaming.api.functions.IngestionTimeExtractor
> class 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator
> class 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction
> class 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction
> class 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction
> interface org.apache.flink.streaming.api.functions.TimestampAssigner
> class org.apache.flink.streaming.api.operators.StoppableStreamSource
> class org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink
> class 
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction
> class org.apache.flink.streaming.util.HDFSCopyToLocal
> class 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator
> class org.apache.flink.streaming.api.collector.selector.DirectedOutput
> class org.apache.flink.streaming.runtime.tasks.TimeServiceProvider
> class org.apache.flink.streaming.util.HDFSCopyFromLocal
> class 
> org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
> {code}
> These classes are simply not tagged.  In the above example TimeStampAssigner 
> should fall in @Public tag I believe.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4558) Add support for synchronizing streams

2016-09-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-4558:
-

 Summary: Add support for synchronizing streams
 Key: FLINK-4558
 URL: https://issues.apache.org/jira/browse/FLINK-4558
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.1.0
Reporter: Elias Levy


As mentioned on the [mailing 
list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html],
 there are use cases that require synchronizing two streams on via their times 
and where it is not practical to buffer all messages from one streams while 
waiting for the other to synchronize.  Flink should add functionality to enable 
such use cases.

This could be implemented by modifying TwoInputStreamOperator so that calls to 
processElement1 and processElement2 could return a value indicating that the 
element can't yet be processed, having the framework then pause processing for 
some time, potentially using exponential back off with a hard maximum, and then 
allowing the back pressure system to do its work and pause the stream.

Alternatively, an API could be added to explicitly pause/unpause a stream.

For ease of use either of these mechanism should be used to create a 
SynchronizedTwoInputStreamOperator that end users can utilize by passing a 
configurable time delta to use as a synchronization threshold.
 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

2016-09-01 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2449
  
Thanks for the quick and thorough review @StephanEwen.

- Concerning the `ExecutionContext` in the `TaskManager`: This is simply 
the `ExecutionContext` which was passed before to the `NetworkEnvironment` but 
now is no longer needed there. I agree that we should be careful with thread 
pool creations. At the moment, we create for each `JobManager` and 
`TaskManager` an additional execution context. The advantage of this approach 
is that we separate the actor execution from the execution of other components. 
Thus, it's not possible that another component starves the execution context of 
the actor and thus influences its responsiveness. But I agree that the few 
additional future callbacks shouldn't be a big burden for the `TaskManager` 
actor system. Will change it.

- It is true that there is no notion of slots on the `TaskManager` right 
now. I've named it `SlotEnvironment` because with the Flip-6 refactorings we 
need to maintain multiple job manager connections and every task runs in a slot 
which is associated with a JobManager. Renaming it to `JobManagerConnection` 
should better reflect what it actually is.

- Yes that is a good idea. Will try to attach the 
`ResultPartitionConsumableNotifier` to the `Task`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455878#comment-15455878
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2449
  
Thanks for the quick and thorough review @StephanEwen.

- Concerning the `ExecutionContext` in the `TaskManager`: This is simply 
the `ExecutionContext` which was passed before to the `NetworkEnvironment` but 
now is no longer needed there. I agree that we should be careful with thread 
pool creations. At the moment, we create for each `JobManager` and 
`TaskManager` an additional execution context. The advantage of this approach 
is that we separate the actor execution from the execution of other components. 
Thus, it's not possible that another component starves the execution context of 
the actor and thus influences its responsiveness. But I agree that the few 
additional future callbacks shouldn't be a big burden for the `TaskManager` 
actor system. Will change it.

- It is true that there is no notion of slots on the `TaskManager` right 
now. I've named it `SlotEnvironment` because with the Flip-6 refactorings we 
need to maintain multiple job manager connections and every task runs in a slot 
which is associated with a JobManager. Renaming it to `JobManagerConnection` 
should better reflect what it actually is.

- Yes that is a good idea. Will try to attach the 
`ResultPartitionConsumableNotifier` to the `Task`.


> Replace ActorGateways in NetworkEnvironment by interfaces
> -
>
> Key: FLINK-4455
> URL: https://issues.apache.org/jira/browse/FLINK-4455
> Project: Flink
>  Issue Type: Improvement
>  Components: Network, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world 
> ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the 
> dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement 
> these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4549) Test and document implicitly supported SQL functions

2016-09-01 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-4549:
---

Assignee: Timo Walther

> Test and document implicitly supported SQL functions
> 
>
> Key: FLINK-4549
> URL: https://issues.apache.org/jira/browse/FLINK-4549
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Calcite supports many SQL functions by translating them into {{RexNode}} s. 
> However, SQL functions like {{NULLIF}}, {{OVERLAPS}} are neither tested nor 
> document although supported.
> These functions should be tested and added to the documentation. We could 
> adopt parts from the Calcite documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem

2016-09-01 Thread Niels Basjes (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455755#comment-15455755
 ] 

Niels Basjes commented on FLINK-4485:
-

I have tried to create a minimal application that reproduces the problem I see.

# Get flink 1.1.1 scala 2.10 binary for Linux.
# Manually update yarn-session.sh to the latest in master to fix the HBase 
classpath issue.
# Make sure you have HBase running and configured properly (i.e. HBASE_CONF_DIR 
and HADOOP_CONF_DIR are setup correctly in your environment).
# Create a table called {{test}} in HBase with at least 1 row in it.
# Start {{./flink-1.1.1/bin/yarn-session.sh -n2 -s5 -d}}
# Get this test project and build it: 
https://github.com/nielsbasjes/Reproduce-FLINK-4485
# Then run this jar file with something like {{./flink-1.1.1/bin/flink run 
target/FLINK-4485-1.0-SNAPSHOT.jar}} several times.
# Now when you do on the Hadoop node running the jobmanager {{lsof | fgrep 
blob}} you should see the deleted files as shown before.

This reproduction path works on my machine ...




> Finished jobs in yarn session fill /tmp filesystem
> --
>
> Key: FLINK-4485
> URL: https://issues.apache.org/jira/browse/FLINK-4485
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.1.0
>Reporter: Niels Basjes
>Priority: Blocker
>
> On a Yarn cluster I start a yarn-session with a few containers and task slots.
> Then I fire a 'large' number of Flink batch jobs in sequence against this 
> yarn session. It is the exact same job (java code) yet it gets different 
> parameters.
> In this scenario it is exporting HBase tables to files in HDFS and the 
> parameters are about which data from which tables and the name of the target 
> directory.
> After running several dozen jobs the jobs submission started to fail and we 
> investigated.
> We found that the cause was that on the Yarn node which was hosting the 
> jobmanager the /tmp file system was full (4GB was 100% full).
> How ever the output of {{du -hcs /tmp}} showed only 200MB in use.
> We found that a very large file (we guess it is the jar of the job) was put 
> in /tmp , used, deleted yet the file handle was not closed by the jobmanager.
> As soon as we killed the jobmanager the disk space was freed.
> The summary of the impact of this is that a yarn-session that receives enough 
> jobs brings down the Yarn node for all users.
> See parts of the output we got from {{lsof}} below.
> {code}
> COMMAND PID  USER   FD  TYPE DEVICE  SIZE   
> NODE NAME
> java  15034   nbasjes  550r  REG 253,17  66219695
> 245 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 
> (deleted)
> java  15034   nbasjes  551r  REG 253,17  66219695
> 252 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 
> (deleted)
> java  15034   nbasjes  552r  REG 253,17  66219695
> 267 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 
> (deleted)
> java  15034   nbasjes  553r  REG 253,17  66219695
> 250 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 
> (deleted)
> java  15034   nbasjes  554r  REG 253,17  66219695
> 288 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 
> (deleted)
> java  15034   nbasjes  555r  REG 253,17  66219695
> 298 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 
> (deleted)
> java  15034   nbasjes  557r  REG 253,17  66219695
> 254 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 
> (deleted)
> java  15034   nbasjes  558r  REG 253,17  66219695
> 292 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 
> (deleted)
> java  15034   nbasjes  559r  REG 253,17  66219695
> 275 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 
> (deleted)
> java  15034   nbasjes  560r  REG 253,17  66219695
> 159 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 
> (deleted)
> java  15034   nbasjes  562r  REG 253,17  66219695
> 238 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 
> (deleted)
> java  15034   nbasjes  568r  REG 253,17  66219695
> 246 
> /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 
> (deleted)
> java  15034   nbasjes  569r  REG 253,17  66219695
> 255 
> /tmp/blobStore-

[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455751#comment-15455751
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77195218
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -18,130 +18,88 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The 
network environment contains
  * the data structures that keep track of all intermediate results and all 
data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network 
buffer pool.
- * All other components (netty, intermediate result managers, ...) are 
only created once the
- * environment is "associated" with a TaskManager and JobManager. This 
happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
 
private final Object lock = new Object();
 
-   private final NetworkEnvironmentConfiguration configuration;
-
-   private final FiniteDuration jobManagerTimeout;
-
private final NetworkBufferPool networkBufferPool;
 
-   private ConnectionManager connectionManager;
+   private final ConnectionManager connectionManager;
 
-   private ResultPartitionManager partitionManager;
+   private final ResultPartitionManager resultPartitionManager;
 
-   private TaskEventDispatcher taskEventDispatcher;
-
-   private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-   private PartitionStateChecker partitionStateChecker;
+   private final TaskEventDispatcher taskEventDispatcher;
 
/** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
-   private KvStateServer kvStateServer;
+   private

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

2016-09-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77195218
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -18,130 +18,88 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The 
network environment contains
  * the data structures that keep track of all intermediate results and all 
data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network 
buffer pool.
- * All other components (netty, intermediate result managers, ...) are 
only created once the
- * environment is "associated" with a TaskManager and JobManager. This 
happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
 
private final Object lock = new Object();
 
-   private final NetworkEnvironmentConfiguration configuration;
-
-   private final FiniteDuration jobManagerTimeout;
-
private final NetworkBufferPool networkBufferPool;
 
-   private ConnectionManager connectionManager;
+   private final ConnectionManager connectionManager;
 
-   private ResultPartitionManager partitionManager;
+   private final ResultPartitionManager resultPartitionManager;
 
-   private TaskEventDispatcher taskEventDispatcher;
-
-   private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-   private PartitionStateChecker partitionStateChecker;
+   private final TaskEventDispatcher taskEventDispatcher;
 
/** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
-   private KvStateServer kvStateServer;
+   private final KvStateServer kvStateServer;
 
/** Registry for {@link org.apache.flink.runtime.state.KvState} 
instances. */
-   private KvStateRegistry kvStateRegistry;
+   private final KvStateRegistry kvStateRegistry;
 
-   private b

[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455743#comment-15455743
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77194531
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -476,6 +252,29 @@ public void unregisterTask(Task task) {
}
}
 
+   public void start() throws IOException {
+   synchronized (lock) {
+   LOG.info("Starting the network environment and its 
components.");
--- End diff --

Yes that's true. Will change it.


> Replace ActorGateways in NetworkEnvironment by interfaces
> -
>
> Key: FLINK-4455
> URL: https://issues.apache.org/jira/browse/FLINK-4455
> Project: Flink
>  Issue Type: Improvement
>  Components: Network, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world 
> ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the 
> dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement 
> these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455740#comment-15455740
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77194470
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -18,130 +18,88 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The 
network environment contains
  * the data structures that keep track of all intermediate results and all 
data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network 
buffer pool.
- * All other components (netty, intermediate result managers, ...) are 
only created once the
- * environment is "associated" with a TaskManager and JobManager. This 
happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
 
private final Object lock = new Object();
 
-   private final NetworkEnvironmentConfiguration configuration;
-
-   private final FiniteDuration jobManagerTimeout;
-
private final NetworkBufferPool networkBufferPool;
 
-   private ConnectionManager connectionManager;
+   private final ConnectionManager connectionManager;
 
-   private ResultPartitionManager partitionManager;
+   private final ResultPartitionManager resultPartitionManager;
 
-   private TaskEventDispatcher taskEventDispatcher;
-
-   private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-   private PartitionStateChecker partitionStateChecker;
+   private final TaskEventDispatcher taskEventDispatcher;
 
/** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
-   private KvStateServer kvStateServer;
+   private

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

2016-09-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77194470
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -18,130 +18,88 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The 
network environment contains
  * the data structures that keep track of all intermediate results and all 
data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network 
buffer pool.
- * All other components (netty, intermediate result managers, ...) are 
only created once the
- * environment is "associated" with a TaskManager and JobManager. This 
happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
 
private final Object lock = new Object();
 
-   private final NetworkEnvironmentConfiguration configuration;
-
-   private final FiniteDuration jobManagerTimeout;
-
private final NetworkBufferPool networkBufferPool;
 
-   private ConnectionManager connectionManager;
+   private final ConnectionManager connectionManager;
 
-   private ResultPartitionManager partitionManager;
+   private final ResultPartitionManager resultPartitionManager;
 
-   private TaskEventDispatcher taskEventDispatcher;
-
-   private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-   private PartitionStateChecker partitionStateChecker;
+   private final TaskEventDispatcher taskEventDispatcher;
 
/** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
-   private KvStateServer kvStateServer;
+   private final KvStateServer kvStateServer;
 
/** Registry for {@link org.apache.flink.runtime.state.KvState} 
instances. */
-   private KvStateRegistry kvStateRegistry;
+   private final KvStateRegistry kvStateRegistry;
 
-   private b

[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

2016-09-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77194531
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -476,6 +252,29 @@ public void unregisterTask(Task task) {
}
}
 
+   public void start() throws IOException {
+   synchronized (lock) {
+   LOG.info("Starting the network environment and its 
components.");
--- End diff --

Yes that's true. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4459) Introduce SlotProvider for Scheduler

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455704#comment-15455704
 ] 

ASF GitHub Bot commented on FLINK-4459:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2424
  
I will try and rebase/merge this on top of #2447 and add this to the 
`flip-6` branch as soon as #2447 is approved.


> Introduce SlotProvider for Scheduler
> 
>
> Key: FLINK-4459
> URL: https://issues.apache.org/jira/browse/FLINK-4459
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Reporter: Till Rohrmann
>Assignee: Kurt Young
>
> Currently the {{Scheduler}} maintains a queue of available instances which it 
> scans if it needs a new slot. If it finds a suitable instance (having free 
> slots available) it will allocate a slot from it. 
> This slot allocation logic can be factored out and be made available via a 
> {{SlotProvider}} interface. The {{SlotProvider}} has methods to allocate a 
> slot given a set of location preferences. Slots should be returned as 
> {{Futures}}, because in the future the slot allocation might happen 
> asynchronously (Flip-6). 
> In the first version, the {{SlotProvider}} implementation will simply 
> encapsulate the existing slot allocation logic extracted from the 
> {{Scheduler}}. When a slot is requested it will return a completed or failed 
> future since the allocation happens synchronously.
> The refactoring will have the advantage to simplify the {{Scheduler}} class 
> and to pave the way for upcoming refactorings (Flip-6).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2424: [FLINK-4459][Scheduler] Introduce SlotProvider for Schedu...

2016-09-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2424
  
I will try and rebase/merge this on top of #2447 and add this to the 
`flip-6` branch as soon as #2447 is approved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-01 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2456

[FLINK-4456] Replace ActorGateway in Task and RuntimeEnvironment

Replaces the `ActorGateway` in `Task` and `RuntimeEnvironment` by 
interfaces to decouple these components from the actors.

- Introduces a `TaskExecutionStateListener` interface for 
`TaskExecutionState` update messages
- Replaces the job manager `ActorGateway` by `InputSplitProvider` and 
`CheckpointNotifier`
- Replaces the task manager `ActorGateway` by `TaskManagerConnection`

The implementations using the `ActorGateways` are

- `InputSplitProvider` --> `TaskInputSplitProvider`
- `TaskExecutionStateListener` --> `ActorGatewayTaskExecutionStateListener`
- `CheckpointNotifier` --> `ActorGatewayCheckpointNotifier`
- `TaskManagerConnection` --> `ActorGatewayTaskManagerConnection`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink FLINK-4456

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2456


commit 578b2ef0bacc57f093150e4addd56b833ebbdf05
Author: Till Rohrmann 
Date:   2016-09-01T12:41:44Z

[FLINK-4456] Introduce TaskExecutionStateListener for Task

commit bd85b45e61160e5d86578da1e52c60ef1bde7c10
Author: Till Rohrmann 
Date:   2016-09-01T13:50:36Z

Replace JobManagerGateway in Task by InputSplitProvider and 
CheckpointNotifier

commit d665583ca03e6748187ada46ce30c39704b17fd8
Author: Till Rohrmann 
Date:   2016-09-01T14:36:31Z

Replace the TaskManager ActorGateway by TaskManagerConnection in Task




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455674#comment-15455674
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2456

[FLINK-4456] Replace ActorGateway in Task and RuntimeEnvironment

Replaces the `ActorGateway` in `Task` and `RuntimeEnvironment` by 
interfaces to decouple these components from the actors.

- Introduces a `TaskExecutionStateListener` interface for 
`TaskExecutionState` update messages
- Replaces the job manager `ActorGateway` by `InputSplitProvider` and 
`CheckpointNotifier`
- Replaces the task manager `ActorGateway` by `TaskManagerConnection`

The implementations using the `ActorGateways` are

- `InputSplitProvider` --> `TaskInputSplitProvider`
- `TaskExecutionStateListener` --> `ActorGatewayTaskExecutionStateListener`
- `CheckpointNotifier` --> `ActorGatewayCheckpointNotifier`
- `TaskManagerConnection` --> `ActorGatewayTaskManagerConnection`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink FLINK-4456

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2456.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2456


commit 578b2ef0bacc57f093150e4addd56b833ebbdf05
Author: Till Rohrmann 
Date:   2016-09-01T12:41:44Z

[FLINK-4456] Introduce TaskExecutionStateListener for Task

commit bd85b45e61160e5d86578da1e52c60ef1bde7c10
Author: Till Rohrmann 
Date:   2016-09-01T13:50:36Z

Replace JobManagerGateway in Task by InputSplitProvider and 
CheckpointNotifier

commit d665583ca03e6748187ada46ce30c39704b17fd8
Author: Till Rohrmann 
Date:   2016-09-01T14:36:31Z

Replace the TaskManager ActorGateway by TaskManagerConnection in Task




> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455668#comment-15455668
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
According to the design document, netty authentication is also part of this 
JIRA. Why was it not addressed?


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-09-01 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
According to the design document, netty authentication is also part of this 
JIRA. Why was it not addressed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455664#comment-15455664
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
I'm done with my initial review.
If you have a minute @mxm, it would be good if you could check the 
CliFrontend changes, to see if they fit the architecture well.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-09-01 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
I'm done with my initial review.
If you have a minute @mxm, it would be good if you could check the 
CliFrontend changes, to see if they fit the architecture well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455662#comment-15455662
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
I manually tested the code. Taskmanagers are properly rejected on 
missmatching cookies, it works when they match.

One thing I found was that the error reporting is not very good:

```
robert@robert-da ...k-1.2-SNAPSHOT-bin/flink-1.2-SNAPSHOT 
(git)-[FLINK-3930] % ./bin/flink run -k asdf ./examples/batch/WordCount.jar
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: 0816b6a3427cdcfc1160655edc2cef09. Waiting for 
job completion.


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:380)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:367)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:322)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1003)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:292)
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:377)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
... 19 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost 
connection to the JobManager.
at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin

[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...

2016-09-01 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2425
  
I manually tested the code. Taskmanagers are properly rejected on 
missmatching cookies, it works when they match.

One thing I found was that the error reporting is not very good:

```
robert@robert-da ...k-1.2-SNAPSHOT-bin/flink-1.2-SNAPSHOT 
(git)-[FLINK-3930] % ./bin/flink run -k asdf ./examples/batch/WordCount.jar
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: 0816b6a3427cdcfc1160655edc2cef09. Waiting for 
job completion.


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:380)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:367)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:322)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1003)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:292)
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:377)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
... 19 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost 
connection to the JobManager.
at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

But I think that this is akka's fault, not ours.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project do

[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77181460
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -597,6 +610,11 @@ public static ContainerLaunchContext 
createTaskManagerContext(
 
containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, 
yarnClientUsername);
 
+   final String secureCookie = 
ENV.get(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE);
+   if(secureCookie != null) {
+   containerEnv.put(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE, 
secureCookie);
--- End diff --

The problem here is that the secure cookie will be put into the environment 
of the TaskManager JVM, so it'll be quite easy to just read the environment 
variables (not sure if that is an issue).
Another issue is that YARN is by default launching processes by creating a 
temporary bash file, with all the environment variables and the JVM invocation. 
So the secure cookie will be written into some tmp directory on YARN.
I wonder if there's some infrastructure in YARN to transfer the tokens in a 
secure way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455536#comment-15455536
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2450
  
I think these changes are good.

As a very personal and biased statement: I think the code overdoes it a bit 
with the "every parameter on a new line" policy. We have beautiful wide screens 
and I get sore fingers from scrolling up and down again to figure out enough 
context of a code passage ;-)

Also, the style breaks the "visual" separation between different parts of 
statement, like "members of a return tuple" vs. "function parameters", or 
"clauses of an if statement" and "body of the if statement". I know the idea of 
that pattern was to improve code readability, but I am getting the feeling this 
is approaching a "verschlimmbesserung" (an improvement that makes things worse).



> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455534#comment-15455534
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77182333
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to persist cookie for the appID: {} in {} ", 
appId, path);
+   try {
+   File f = new File(path);
+   if(!f.exists()) {
+   f.createNewFile();
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (subNode.containsKey(cookieKey)) {
+   String errorMessage = "Secure Cookie is already 
found in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   subNode.addProperty(cookieKey, cookie);
+   config.save();
+   LOG.debug("Persisted cookie for the appID: {}", appId);
+   } catch(Exception e) {
+   LOG.error("Exception occurred while persisting app 
state for app id: {}. Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+   }
+
+   public static String getAppSecureCookie(String appId) {
+   if(appId == null) {
+   String errorMessage = "Application ID cannot be null";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+
+   String cookieFromFile;
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to fetch cookie for the appID: {} from {}", 
appId, path);
+
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (!subNode.containsKey(cookieKey)) {
+   String errorMessage = "Could  not find the app 
ID section in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   cookieFromFile = subNode.getString(cookieKey, "");
+   if(cookieFromFile.length() == 0) {
+   String errorMessage = "Could  not find cookie 
in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   } catch(Exception e) {
+   LOG.error("Exception occurred while fetching cookie for 
app id: {} Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+
+   LOG.debug("Found cookie for the appID: {}", appId);
+   return cookieFromFile;
+   }
+
+   public static void removeAppState(String appId) {
+   if(appId == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to remove the reference for the appId: {} from 
{}", appId, path);
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.warn(errorMessage);
+ 

[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77182333
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to persist cookie for the appID: {} in {} ", 
appId, path);
+   try {
+   File f = new File(path);
+   if(!f.exists()) {
+   f.createNewFile();
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (subNode.containsKey(cookieKey)) {
+   String errorMessage = "Secure Cookie is already 
found in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   subNode.addProperty(cookieKey, cookie);
+   config.save();
+   LOG.debug("Persisted cookie for the appID: {}", appId);
+   } catch(Exception e) {
+   LOG.error("Exception occurred while persisting app 
state for app id: {}. Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+   }
+
+   public static String getAppSecureCookie(String appId) {
+   if(appId == null) {
+   String errorMessage = "Application ID cannot be null";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+
+   String cookieFromFile;
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to fetch cookie for the appID: {} from {}", 
appId, path);
+
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   SubnodeConfiguration subNode = config.getSection(appId);
+   if (!subNode.containsKey(cookieKey)) {
+   String errorMessage = "Could  not find the app 
ID section in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   cookieFromFile = subNode.getString(cookieKey, "");
+   if(cookieFromFile.length() == 0) {
+   String errorMessage = "Could  not find cookie 
in "+ path + " for the appID: "+ appId;
+   LOG.error(errorMessage);
+   throw new RuntimeException(errorMessage);
+   }
+   } catch(Exception e) {
+   LOG.error("Exception occurred while fetching cookie for 
app id: {} Exception: {}", appId, e);
+   throw new RuntimeException(e);
+   }
+
+   LOG.debug("Found cookie for the appID: {}", appId);
+   return cookieFromFile;
+   }
+
+   public static void removeAppState(String appId) {
+   if(appId == null) { return; }
+   String path = System.getProperty("user.home") + File.separator 
+ fileName;
+   LOG.debug("Going to remove the reference for the appId: {} from 
{}", appId, path);
+   try {
+   File f = new File(path);
+   if (!f.exists()) {
+   String errorMessage = "Could not find the file: 
" + path + " in user home directory";
+   LOG.warn(errorMessage);
+   return;
+   }
+   HierarchicalINIConfiguration config = new 
HierarchicalINIConfiguration(path);
+   config.clearTree(appId);
+   config.save();
+

[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2450
  
I think these changes are good.

As a very personal and biased statement: I think the code overdoes it a bit 
with the "every parameter on a new line" policy. We have beautiful wide screens 
and I get sore fingers from scrolling up and down again to figure out enough 
context of a code passage ;-)

Also, the style breaks the "visual" separation between different parts of 
statement, like "members of a return tuple" vs. "function parameters", or 
"clauses of an if statement" and "body of the if statement". I know the idea of 
that pattern was to improve code readability, but I am getting the feeling this 
is approaching a "verschlimmbesserung" (an improvement that makes things worse).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455531#comment-15455531
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77182101
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
--- End diff --

please put `return` into a new line.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77182101
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -682,6 +774,91 @@ public static File 
getYarnPropertiesLocation(Configuration conf) {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + 
currentUser);
}
 
+   public static void persistAppState(String appId, String cookie) {
+   if(appId == null || cookie == null) { return; }
--- End diff --

please put `return` into a new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455524#comment-15455524
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77181663
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -439,8 +450,8 @@ public static void runInteractiveCli(YarnClusterClient 
yarnCluster, boolean read
case "quit":
case "stop":

yarnCluster.shutdownCluster();
+   if 
(yarnCluster.hasBeenShutdown()) { removeAppState(applicationId); }
--- End diff --

This is not consistent with the style in the rest of the Flink codebase. 
Can you put the "remoteAppState()" into a separate line?


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77181663
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -439,8 +450,8 @@ public static void runInteractiveCli(YarnClusterClient 
yarnCluster, boolean read
case "quit":
case "stop":

yarnCluster.shutdownCluster();
+   if 
(yarnCluster.hasBeenShutdown()) { removeAppState(applicationId); }
--- End diff --

This is not consistent with the style in the rest of the Flink codebase. 
Can you put the "remoteAppState()" into a separate line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455520#comment-15455520
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77181460
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -597,6 +610,11 @@ public static ContainerLaunchContext 
createTaskManagerContext(
 
containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, 
yarnClientUsername);
 
+   final String secureCookie = 
ENV.get(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE);
+   if(secureCookie != null) {
+   containerEnv.put(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE, 
secureCookie);
--- End diff --

The problem here is that the secure cookie will be put into the environment 
of the TaskManager JVM, so it'll be quite easy to just read the environment 
variables (not sure if that is an issue).
Another issue is that YARN is by default launching processes by creating a 
temporary bash file, with all the environment variables and the JVM invocation. 
So the secure cookie will be written into some tmp directory on YARN.
I wonder if there's some infrastructure in YARN to transfer the tokens in a 
secure way.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455509#comment-15455509
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2450#discussion_r77180730
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
 ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
 InetAddress.getByName("localhost").getHostAddress())
 
-  val configuration = generateConfiguration(userConfiguration)
+  protected val _configuration = generateConfiguration(userConfiguration)
--- End diff --

Why the underscore here?


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by L...

2016-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2450#discussion_r77180730
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 ---
@@ -69,7 +69,7 @@ abstract class FlinkMiniCluster(
 ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
 InetAddress.getByName("localhost").getHostAddress())
 
-  val configuration = generateConfiguration(userConfiguration)
+  protected val _configuration = generateConfiguration(userConfiguration)
--- End diff --

Why the underscore here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455506#comment-15455506
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77180355
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class BlobClientSecureTest extends BlobClientTest {
+
+   /**
+* Starts the BLOB server with secure cookie enabled configuration
+*/
+   @BeforeClass
+   public static void startServer() {
+   try {
+   config.setBoolean(ConfigConstants.SECURITY_ENABLED, 
true);
+   config.setString(ConfigConstants.SECURITY_COOKIE, 
"foo");
+   BLOB_SERVER = new BlobServer(config);
+   }
+   catch (IOException e) {
+   e.printStackTrace();
+   fail(e.getMessage());
+   }
--- End diff --

Maybe it makes sense to add a test ensuring that clients with a wrong 
cookie are rejected:

```
@Test(expected = IOException.class)
public void testInvalidSecurityCookie() throws IOException {

BlobClient client = null;

try {
byte[] testBuffer = createTestBuffer();
MessageDigest md = BlobUtils.createMessageDigest();
md.update(testBuffer);

InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
client = new BlobClient(serverAddress, "different");

// Store some data
client.put(testBuffer);
}
finally {
if (client != null) {
try {
client.close();
} catch (Throwable t) {}
}
}
}
```


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77180355
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class BlobClientSecureTest extends BlobClientTest {
+
+   /**
+* Starts the BLOB server with secure cookie enabled configuration
+*/
+   @BeforeClass
+   public static void startServer() {
+   try {
+   config.setBoolean(ConfigConstants.SECURITY_ENABLED, 
true);
+   config.setString(ConfigConstants.SECURITY_COOKIE, 
"foo");
+   BLOB_SERVER = new BlobServer(config);
+   }
+   catch (IOException e) {
+   e.printStackTrace();
+   fail(e.getMessage());
+   }
--- End diff --

Maybe it makes sense to add a test ensuring that clients with a wrong 
cookie are rejected:

```
@Test(expected = IOException.class)
public void testInvalidSecurityCookie() throws IOException {

BlobClient client = null;

try {
byte[] testBuffer = createTestBuffer();
MessageDigest md = BlobUtils.createMessageDigest();
md.update(testBuffer);

InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
client = new BlobClient(serverAddress, "different");

// Store some data
client.put(testBuffer);
}
finally {
if (client != null) {
try {
client.close();
} catch (Throwable t) {}
}
}
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455491#comment-15455491
 ] 

ASF GitHub Bot commented on FLINK-4510:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2453
  
Thanks for looking into this.

I wonder if we can do this simpler, without changes to the 
CheckpointCoordinator.
The only thing that really needs to change is that without periodic 
checkpoints, the `startCheckpointScheduler()` method is not called, which means 
that the JobStatusListener that "CheckpointActivatorDeactivator" should not be 
created and started.



> Always create CheckpointCoordinator
> ---
>
> Key: FLINK-4510
> URL: https://issues.apache.org/jira/browse/FLINK-4510
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Jark Wu
>
> The checkpoint coordinator is only created if a checkpointing interval is 
> configured. This means that no savepoints can be triggered if there is no 
> checkpointing interval specified.
> Instead we should always create it and allow an interval of 0 for disabled 
> periodic checkpoints. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2453: [FLINK-4510] [checkpoint] Always create CheckpointCoordin...

2016-09-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2453
  
Thanks for looking into this.

I wonder if we can do this simpler, without changes to the 
CheckpointCoordinator.
The only thing that really needs to change is that without periodic 
checkpoints, the `startCheckpointScheduler()` method is not called, which means 
that the JobStatusListener that "CheckpointActivatorDeactivator" should not be 
created and started.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77178647
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class BlobClientSecureTest extends BlobClientTest {
--- End diff --

Why are tests like the `testContentAddressableBuffer()` not failing, even 
if they pass a `null` security cookie?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455477#comment-15455477
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77178647
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+public class BlobClientSecureTest extends BlobClientTest {
--- End diff --

Why are tests like the `testContentAddressableBuffer()` not failing, even 
if they pass a `null` security cookie?


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4557) Table API Stream Aggregations

2016-09-01 Thread Timo Walther (JIRA)
Timo Walther created FLINK-4557:
---

 Summary: Table API Stream Aggregations
 Key: FLINK-4557
 URL: https://issues.apache.org/jira/browse/FLINK-4557
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther


The Table API is a declarative API to define queries on static and streaming 
tables. So far, only projection, selection, and union are supported operations 
on streaming tables.

This issue and the corresponding FLIP proposes to add support for different 
types of aggregations on top of streaming tables. In particular, we seek to 
support:

*Group-window aggregates*, i.e., aggregates which are computed for a group of 
elements. A (time or row-count) window is required to bound the infinite input 
stream into a finite group.

*Row-window aggregates*, i.e., aggregates which are computed for each row, 
based on a window (range) of preceding and succeeding rows.
Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped 
data streams for streaming tables as well as batch tables.

Since time-windowed aggregates will be the first operation that require the 
definition of time, we also need to discuss how the Table API handles time 
characteristics, timestamps, and watermarks.

The FLIP can be found here: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455463#comment-15455463
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77177744
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -2041,6 +2044,18 @@ object JobManager {
 null
 }
 
+val secureAuth = 
configuration.getBoolean(ConfigConstants.SECURITY_ENABLED,
+  
ConfigConstants.DEFAULT_SECURITY_ENABLED)
+if(secureAuth == true) {
+  val secureCookie = 
configuration.getString(ConfigConstants.SECURITY_COOKIE, null)
+  if(secureCookie == null) {
+val message = "Missing " + ConfigConstants.SECURITY_COOKIE +
+  " configuration in Flink configuration file"
+LOG.error(message)
+throw new RuntimeException(message)
+  }
+}
--- End diff --

As I said before, this is duplicate code that can be encapsulated into a 
`SecurityUtils` static method.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455465#comment-15455465
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77177808
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1470,6 +1470,18 @@ object TaskManager {
 null
 }
 
+val secureAuth = 
configuration.getBoolean(ConfigConstants.SECURITY_ENABLED,
+  
ConfigConstants.DEFAULT_SECURITY_ENABLED)
+if(secureAuth == true) {
+  val secureCookie = 
configuration.getString(ConfigConstants.SECURITY_COOKIE, null)
+  if(secureCookie == null) {
+val message = "Missing " + ConfigConstants.SECURITY_COOKIE +
+  " configuration in Flink configuration file"
+LOG.error(message)
+throw new RuntimeException(message)
+  }
+}
+
--- End diff --

Duplicate code


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77177808
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -1470,6 +1470,18 @@ object TaskManager {
 null
 }
 
+val secureAuth = 
configuration.getBoolean(ConfigConstants.SECURITY_ENABLED,
+  
ConfigConstants.DEFAULT_SECURITY_ENABLED)
+if(secureAuth == true) {
+  val secureCookie = 
configuration.getString(ConfigConstants.SECURITY_COOKIE, null)
+  if(secureCookie == null) {
+val message = "Missing " + ConfigConstants.SECURITY_COOKIE +
+  " configuration in Flink configuration file"
+LOG.error(message)
+throw new RuntimeException(message)
+  }
+}
+
--- End diff --

Duplicate code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77177744
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -2041,6 +2044,18 @@ object JobManager {
 null
 }
 
+val secureAuth = 
configuration.getBoolean(ConfigConstants.SECURITY_ENABLED,
+  
ConfigConstants.DEFAULT_SECURITY_ENABLED)
+if(secureAuth == true) {
+  val secureCookie = 
configuration.getString(ConfigConstants.SECURITY_COOKIE, null)
+  if(secureCookie == null) {
+val message = "Missing " + ConfigConstants.SECURITY_COOKIE +
+  " configuration in Flink configuration file"
+LOG.error(message)
+throw new RuntimeException(message)
+  }
+}
--- End diff --

As I said before, this is duplicate code that can be encapsulated into a 
`SecurityUtils` static method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455428#comment-15455428
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77175883
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -18,130 +18,88 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The 
network environment contains
  * the data structures that keep track of all intermediate results and all 
data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network 
buffer pool.
- * All other components (netty, intermediate result managers, ...) are 
only created once the
- * environment is "associated" with a TaskManager and JobManager. This 
happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
 
private final Object lock = new Object();
 
-   private final NetworkEnvironmentConfiguration configuration;
-
-   private final FiniteDuration jobManagerTimeout;
-
private final NetworkBufferPool networkBufferPool;
 
-   private ConnectionManager connectionManager;
+   private final ConnectionManager connectionManager;
 
-   private ResultPartitionManager partitionManager;
+   private final ResultPartitionManager resultPartitionManager;
 
-   private TaskEventDispatcher taskEventDispatcher;
-
-   private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-   private PartitionStateChecker partitionStateChecker;
+   private final TaskEventDispatcher taskEventDispatcher;
 
/** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
-   private KvStateServer kvStateServer;
+   private 

[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...

2016-09-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2449
  
+1 for moving the `PartitionStateChecker` and 
`ResultPartitionConsumableNotifier` out of the `NetworkEnvironment`.

Few questions and comments:

  - Do we need an extra ExecutorService in the TaskManager? I have been 
digging through a bunch of thread dumps over time and there are already many 
threads and pools already. I would really like to avoid having yet another 
Thread pool (creating thread pools should be an extremely careful decision).

  The Akka thread pool executor is quite over-provisioned for the few 
actors we actually use. I think it is perfectly feasible to use that one for 
the few extra futures introduced here. In any case, if not reusing the Akka 
executor pool, then the thread pool needs to be shut down in the TaskManager 
runner. Otherwise it creates a leak when running successive local Flink jobs.

  - I am a bit consumed about the `SlotEnvironment`. Maybe it is mainly the 
name, but what does it have to do with the slots? Is it not more like a 
network-messages specific *JobManager Connection*?
  
  - The `ResultPartitionConsumableNotifier` could be per `Task` - that way, 
future multi-JobManager assiciations would work seamlessly and it could 
directly call `fail(...)` on the Task without having to go through the 
`TaskManager`. It could probably leave the TaskManager out of the picture 
completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455431#comment-15455431
 ] 

ASF GitHub Bot commented on FLINK-4455:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2449
  
+1 for moving the `PartitionStateChecker` and 
`ResultPartitionConsumableNotifier` out of the `NetworkEnvironment`.

Few questions and comments:

  - Do we need an extra ExecutorService in the TaskManager? I have been 
digging through a bunch of thread dumps over time and there are already many 
threads and pools already. I would really like to avoid having yet another 
Thread pool (creating thread pools should be an extremely careful decision).

  The Akka thread pool executor is quite over-provisioned for the few 
actors we actually use. I think it is perfectly feasible to use that one for 
the few extra futures introduced here. In any case, if not reusing the Akka 
executor pool, then the thread pool needs to be shut down in the TaskManager 
runner. Otherwise it creates a leak when running successive local Flink jobs.

  - I am a bit consumed about the `SlotEnvironment`. Maybe it is mainly the 
name, but what does it have to do with the slots? Is it not more like a 
network-messages specific *JobManager Connection*?
  
  - The `ResultPartitionConsumableNotifier` could be per `Task` - that way, 
future multi-JobManager assiciations would work seamlessly and it could 
directly call `fail(...)` on the Task without having to go through the 
`TaskManager`. It could probably leave the TaskManager out of the picture 
completely.


> Replace ActorGateways in NetworkEnvironment by interfaces
> -
>
> Key: FLINK-4455
> URL: https://issues.apache.org/jira/browse/FLINK-4455
> Project: Flink
>  Issue Type: Improvement
>  Components: Network, TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world 
> ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the 
> dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement 
> these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...

2016-09-01 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2449#discussion_r77175883
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -18,130 +18,88 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The 
network environment contains
  * the data structures that keep track of all intermediate results and all 
data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network 
buffer pool.
- * All other components (netty, intermediate result managers, ...) are 
only created once the
- * environment is "associated" with a TaskManager and JobManager. This 
happens as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
 
private final Object lock = new Object();
 
-   private final NetworkEnvironmentConfiguration configuration;
-
-   private final FiniteDuration jobManagerTimeout;
-
private final NetworkBufferPool networkBufferPool;
 
-   private ConnectionManager connectionManager;
+   private final ConnectionManager connectionManager;
 
-   private ResultPartitionManager partitionManager;
+   private final ResultPartitionManager resultPartitionManager;
 
-   private TaskEventDispatcher taskEventDispatcher;
-
-   private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-   private PartitionStateChecker partitionStateChecker;
+   private final TaskEventDispatcher taskEventDispatcher;
 
/** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
-   private KvStateServer kvStateServer;
+   private final KvStateServer kvStateServer;
 
/** Registry for {@link org.apache.flink.runtime.state.KvState} 
instances. */
-   private KvStateRegistry kvStateRegistry;
+   private final KvStateRegistry kvStateRegistry;
 
-   private bo

[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization

2016-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455423#comment-15455423
 ] 

ASF GitHub Bot commented on FLINK-3930:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77175545
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -530,6 +529,7 @@ public void uploadRequiredJarFiles(InetSocketAddress 
serverAddress) throws IOExc
}
}
}
+   */
--- End diff --

I think its better to completely delete the method instead of commenting it 
out.


> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...

2016-09-01 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2425#discussion_r77175545
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -530,6 +529,7 @@ public void uploadRequiredJarFiles(InetSocketAddress 
serverAddress) throws IOExc
}
}
}
+   */
--- End diff --

I think its better to completely delete the method instead of commenting it 
out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >