[jira] [Commented] (FLINK-4439) Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid
[ https://issues.apache.org/jira/browse/FLINK-4439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457726#comment-15457726 ] ASF GitHub Bot commented on FLINK-4439: --- Github user gheo21 commented on the issue: https://github.com/apache/flink/pull/2397 Hi @rmetzger, Thanks for taking a look. Do I need to do anything else to get it merged? > Error message KafkaConsumer08 when all 'bootstrap.servers' are invalid > -- > > Key: FLINK-4439 > URL: https://issues.apache.org/jira/browse/FLINK-4439 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.3 >Reporter: Gheorghe Gheorghe >Priority: Minor > > The "flink-connector-kafka-0.8_2" is logging the following error when all > 'bootstrap.servers' are invalid when passed to the FlinkKafkaConsumer08. > See stacktrace: > {code:title=stacktrace|borderStyle=solid} > 2016-08-21 15:22:30 WARN FlinkKafkaConsumerBase:290 - Error communicating > with broker inexistentKafkHost:9092 to find partitions for [testTopic].class > java.nio.channels.ClosedChannelException. Message: null > 2016-08-21 15:22:30 DEBUG FlinkKafkaConsumerBase:292 - Detailed trace > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:91) > at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:68) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.getPartitionsForTopic(FlinkKafkaConsumer08.java:264) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:193) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:164) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.(FlinkKafkaConsumer08.java:131) > at MetricsFromKafka$.main(MetricsFromKafka.scala:38) > at MetricsFromKafka.main(MetricsFromKafka.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at sbt.Run.invokeMain(Run.scala:67) > at sbt.Run.run0(Run.scala:61) > at sbt.Run.sbt$Run$$execute$1(Run.scala:51) > at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Run$$anonfun$run$1.apply(Run.scala:55) > at sbt.Logger$$anon$4.apply(Logger.scala:84) > at sbt.TrapExit$App.run(TrapExit.scala:248) > at java.lang.Thread.run(Thread.java:745) > {code} > In the above stackrace it is hard to figure out that the actual servers > provided as a config cannot be resolved to a valid ip address. Moreover the > flink kafka consumer will try all of those servers one by one and failing to > get partition information. > The suggested improvement is to fail fast and announce the user that the > servers provided in the 'boostrap.servers' config are invalid. If at least > one server is valid then the exception should not be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2397: [FLINK-4439] Validate 'bootstrap.servers' config in flink...
Github user gheo21 commented on the issue: https://github.com/apache/flink/pull/2397 Hi @rmetzger, Thanks for taking a look. Do I need to do anything else to get it merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4562) table example make an divided module in flink-examples
shijinkui created FLINK-4562: Summary: table example make an divided module in flink-examples Key: FLINK-4562 URL: https://issues.apache.org/jira/browse/FLINK-4562 Project: Flink Issue Type: Improvement Reporter: shijinkui example code should't packaged in table module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4562) table examples make an divided module in flink-examples
[ https://issues.apache.org/jira/browse/FLINK-4562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-4562: - Summary: table examples make an divided module in flink-examples (was: table example make an divided module in flink-examples) > table examples make an divided module in flink-examples > --- > > Key: FLINK-4562 > URL: https://issues.apache.org/jira/browse/FLINK-4562 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > > example code should't packaged in table module. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager
[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4536 at 9/2/16 5:07 AM: Hi [~skidder], just to clarify, the thread leak issue should be resolved after the fix on your InfluxDB Sink, and like the discussion above, should be independent of the Kinesis connector, correct? The cached thread pool created by the Kinesis connector is also irrelevant to the AWS SDK and KCL version used, and didn't seem to be causing any thread leaks. So, I would suggest that this particular issue is resolved ;) As for the {{ProvisionedThroughputExceeded}} failures: The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded exceptions. This is an AWS service usage limitation (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). The normal way to tune this is by setting {{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value (default is 0 interval, at most 1000ms should be sufficient for most cases) in the consumer's configuration properties, so that the consumer subtasks don't fetch records that frequently, and accordingly also increase {{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more records on each fetch. If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens encountering {{ProvisionedThroughputExceeded}}, could you open a separate JIRA for this and we continue any discussion on this over there? As for the KCL, it should be irrelevant to the issues you were facing; we're only using a single utility class from it to deaggregate Kinesis records. was (Author: tzulitai): Hi [~skidder], just to clarify, the thread leak issue should be resolved after the fix on your InfluxDB Sink, and like the discussion above, should be independent of the Kinesis connector, correct? The cached thread pool created by the Kinesis connector is also irrelevant to the AWS SDK and KCL version used, and didn't seem to be causing any thread leaks. So, I would suggest that this particular issue is resolved ;) As for the {{ProvisionedThroughputExceeded}} failures: The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded exceptions. This is an AWS service usage limitation (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). The normal way to tune this is by setting {{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value (default is 0 interval, at most 1000ms should be sufficient for most cases) in the consumer's configuration properties, so that the consumer subtasks don't fetch records that frequently, and accordingly also increase {{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more records on each fetch. If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens encountering the ProvisionedThroughputExceeded, could you open a separate JIRA for this and we continue any discussion on this over there? As for the KCL, it should be irrelevant to the issues you were facing; we're only using a single utillity class from it to deaggregate Kinesis records. > Possible thread leak in Task Manager > > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Scott Kidder > > Running Flink release 1.1.1 commit 61bfb36 in the following configuration: > Job Manager > 2 x Task Manager (2 CPU cores on each Task Manager) > I've also updated the Kinesis source to use the latest AWS Java SDK, release > 1.11.29. > I've got a single Flink application using all 4 slots. It consumes from a > Kinesis stream configured with 2 shards. I've limited the Kinesis source to a > parallelism of 2 as a workaround for FLINK-4341. > Occasionally the Kinesis consumer fails because of provisioned-throughput > limits being hit. The application automatically restarts, and resumes > processing with the checkpoint stored on the Job Manager with no outward > indication of problems. > I recently enabled the StatsD metrics reporter in Flink and noticed that the > number of threads running on each Task Manager increases by about 20 threads > each time the application restarts. Over the course of a day the application > might hit provisioned-throughput limits 20 times or so (this is not fully > production yet, so hitting these limits is acceptable for now). But the > number of threads continues to grow unbounded with no increase in workload on > the Task Managers. > The following link includes charts for the overall Flink cluster performance > & Task Manager JVM thread
[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager
[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4536 at 9/2/16 5:07 AM: Hi [~skidder], just to clarify, the thread leak issue should be resolved after the fix on your InfluxDB Sink, and like the discussion above, should be independent of the Kinesis connector, correct? The cached thread pool created by the Kinesis connector is also irrelevant to the AWS SDK and KCL version used, and didn't seem to be causing any thread leaks. So, I would suggest that this particular issue is resolved ;) As for the {{ProvisionedThroughputExceeded}} failures: The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded exceptions. This is an AWS service usage limitation (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). The normal way to tune this is by setting {{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value (default is 0 interval, at most 1000ms should be sufficient for most cases) in the consumer's configuration properties, so that the consumer subtasks don't fetch records that frequently, and accordingly also increase {{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more records on each fetch. If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens encountering the ProvisionedThroughputExceeded, could you open a separate JIRA for this and we continue any discussion on this over there? As for the KCL, it should be irrelevant to the issues you were facing; we're only using a single utillity class from it to deaggregate Kinesis records. was (Author: tzulitai): Hi [~skidder], just to clarify, the thread leak issue should be resolved after the fix on your InfluxDB Sink, and like the discussion above, should be independent of the Kinesis connector, correct? The cached thread pool created by the Kinesis connector is also irrelevant to the AWS SDK and KCL version used, and didn't seem to be causing any thread leaks. So, I would suggest that this particular issue is resolved ;) As for the ProvisionedThroughputExceeded failures: The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded exceptions. This is an AWS service usage limitation (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). The normal way to tune this is by setting {{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value (default is 0 interval, at most 1000ms should be sufficient for most cases) in the consumer's configuration properties, so that the consumer subtasks don't fetch records that frequently, and accordingly also increase {{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more records on each fetch. If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens encountering the ProvisionedThroughputExceeded, could you open a separate JIRA for this and we continue any discussion on this over there? As for the KCL, it should be irrelevant to the issues you were facing; we're only using a single utillity class from it to deaggregate Kinesis records. > Possible thread leak in Task Manager > > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Scott Kidder > > Running Flink release 1.1.1 commit 61bfb36 in the following configuration: > Job Manager > 2 x Task Manager (2 CPU cores on each Task Manager) > I've also updated the Kinesis source to use the latest AWS Java SDK, release > 1.11.29. > I've got a single Flink application using all 4 slots. It consumes from a > Kinesis stream configured with 2 shards. I've limited the Kinesis source to a > parallelism of 2 as a workaround for FLINK-4341. > Occasionally the Kinesis consumer fails because of provisioned-throughput > limits being hit. The application automatically restarts, and resumes > processing with the checkpoint stored on the Job Manager with no outward > indication of problems. > I recently enabled the StatsD metrics reporter in Flink and noticed that the > number of threads running on each Task Manager increases by about 20 threads > each time the application restarts. Over the course of a day the application > might hit provisioned-throughput limits 20 times or so (this is not fully > production yet, so hitting these limits is acceptable for now). But the > number of threads continues to grow unbounded with no increase in workload on > the Task Managers. > The following link includes charts for the overall Flink cluster performance > & Task Manager JVM threads o
[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager
[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4536 at 9/2/16 4:58 AM: Hi [~skidder], just to clarify, the thread leak issue should be resolved after the fix on your InfluxDB Sink, and like the discussion above, should be independent of the Kinesis connector, correct? The cached thread pool created by the Kinesis connector is also irrelevant to the AWS SDK and KCL version used, and didn't seem to be causing any thread leaks. So, I would suggest that this particular issue is resolved ;) As for the ProvisionedThroughputExceeded failures: The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded exceptions. This is an AWS service usage limitation (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). The normal way to tune this is by setting {{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value (default is 0 interval, at most 1000ms should be sufficient for most cases) in the consumer's configuration properties, so that the consumer subtasks don't fetch records that frequently, and accordingly also increase {{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more records on each fetch. If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens encountering the ProvisionedThroughputExceeded, could you open a separate JIRA for this and we continue any discussion on this over there? As for the KCL, it should be irrelevant to the issues you were facing; we're only using a single utillity class from it to deaggregate Kinesis records. was (Author: tzulitai): Hi [~skidder], just to clarify, the thread leak issue should be resolved after the fix on your InfluxDB Sink, and like the discussion above, should be independent of the Kinesis connector, correct? The cached thread pool created by the Kinesis connector is also irrelevant to the AWS SDK and KCL version used. So, I would suggest that this particular issue is resolved ;) As for the ProvisionedThroughputExceeded failures: The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded exceptions. This is an AWS service usage limitation (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). The normal way to tune this is by setting {{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value (default is 0 interval, at most 1000ms should be sufficient for most cases) in the consumer's configuration properties, so that the consumer subtasks don't fetch records that frequently, and accordingly also increase {{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more records on each fetch. If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens encountering the ProvisionedThroughputExceeded, could you open a separate JIRA for this and we continue any discussion on this over there? As for the KCL, it should be irrelevant to the issues you were facing; we're only using a single utillity class from it to deaggregate Kinesis records. > Possible thread leak in Task Manager > > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Scott Kidder > > Running Flink release 1.1.1 commit 61bfb36 in the following configuration: > Job Manager > 2 x Task Manager (2 CPU cores on each Task Manager) > I've also updated the Kinesis source to use the latest AWS Java SDK, release > 1.11.29. > I've got a single Flink application using all 4 slots. It consumes from a > Kinesis stream configured with 2 shards. I've limited the Kinesis source to a > parallelism of 2 as a workaround for FLINK-4341. > Occasionally the Kinesis consumer fails because of provisioned-throughput > limits being hit. The application automatically restarts, and resumes > processing with the checkpoint stored on the Job Manager with no outward > indication of problems. > I recently enabled the StatsD metrics reporter in Flink and noticed that the > number of threads running on each Task Manager increases by about 20 threads > each time the application restarts. Over the course of a day the application > might hit provisioned-throughput limits 20 times or so (this is not fully > production yet, so hitting these limits is acceptable for now). But the > number of threads continues to grow unbounded with no increase in workload on > the Task Managers. > The following link includes charts for the overall Flink cluster performance > & Task Manager JVM threads over the course of 12 hours: > http://imgur.com/a/K59
[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager
[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457533#comment-15457533 ] Tzu-Li (Gordon) Tai commented on FLINK-4536: Hi [~skidder], just to clarify, the thread leak issue should be resolved after the fix on your InfluxDB Sink, and like the discussion above, should be independent of the Kinesis connector, correct? The cached thread pool created by the Kinesis connector is also irrelevant to the AWS SDK and KCL version used. So, I would suggest that this particular issue is resolved ;) As for the ProvisionedThroughputExceeded failures: The connector fails after hitting 3 consecutive ProvisionedThroughputExceeded exceptions. This is an AWS service usage limitation (http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html). The normal way to tune this is by setting {{ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS}} to a higher value (default is 0 interval, at most 1000ms should be sufficient for most cases) in the consumer's configuration properties, so that the consumer subtasks don't fetch records that frequently, and accordingly also increase {{ConsumerConfigConstants.SHARD_GETRECORDS_MAX}} to let Kinesis return more records on each fetch. If you are suggesting that the AWS SDK version bump to 1.11.30 somehow lessens encountering the ProvisionedThroughputExceeded, could you open a separate JIRA for this and we continue any discussion on this over there? As for the KCL, it should be irrelevant to the issues you were facing; we're only using a single utillity class from it to deaggregate Kinesis records. > Possible thread leak in Task Manager > > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Scott Kidder > > Running Flink release 1.1.1 commit 61bfb36 in the following configuration: > Job Manager > 2 x Task Manager (2 CPU cores on each Task Manager) > I've also updated the Kinesis source to use the latest AWS Java SDK, release > 1.11.29. > I've got a single Flink application using all 4 slots. It consumes from a > Kinesis stream configured with 2 shards. I've limited the Kinesis source to a > parallelism of 2 as a workaround for FLINK-4341. > Occasionally the Kinesis consumer fails because of provisioned-throughput > limits being hit. The application automatically restarts, and resumes > processing with the checkpoint stored on the Job Manager with no outward > indication of problems. > I recently enabled the StatsD metrics reporter in Flink and noticed that the > number of threads running on each Task Manager increases by about 20 threads > each time the application restarts. Over the course of a day the application > might hit provisioned-throughput limits 20 times or so (this is not fully > production yet, so hitting these limits is acceptable for now). But the > number of threads continues to grow unbounded with no increase in workload on > the Task Managers. > The following link includes charts for the overall Flink cluster performance > & Task Manager JVM threads over the course of 12 hours: > http://imgur.com/a/K59hz > Each decrease and subsequent spike in threads corresponds to the job being > restarted due to an AWS Kinesis source error. > Here are the logs from one of the Task Manager instances on startup: > {code} > 2016-08-30 14:52:50,438 WARN org.apache.hadoop.util.NativeCodeLoader > - Unable to load native-hadoop library for your platform... > using builtin-java classes where applicable > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC) > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > user: root > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: OpenJDK > 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14 > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 2048 MiBytes > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > /usr/lib/jvm/java-1.8-openjdk/jre > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.7.2 > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager
[jira] [Commented] (FLINK-4561) replace all the scala version as a `scala.binary.version` property
[ https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457364#comment-15457364 ] ASF GitHub Bot commented on FLINK-4561: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2459 [FLINK-4561] replace all the scala version as a `scala.binary.version` property Replace all the scala version(2.10) as a property `scala.binary.version` defined in root pom properties. default scala version property is 2.10. modify: 1. dependency include scala version 2. module defining include scala version 3. scala version upgrade to 2.11.8 from 2.11.7 Only modify pom file. The pull request references the related JIRA issue ("[FLINK-4561] replace all the scala version as a `scala.binary.version` property") You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink scala_version_property_replace Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2459.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2459 commit e8feaabcb4dc853c6e5b895c2351ea3d96351096 Author: shijinkui Date: 2016-09-02T03:14:07Z [FLINK-4561] replace all the scala version as a `scala.binary.version` property replace all the scala version(2.10) as a property `scala.binary.version` defined in root pom properties. default scala version property is 2.10. modify: 1. dependency include scala version 2. module defining include scala version 3. scala version upgrade to 2.11.8 from 2.11.7 > replace all the scala version as a `scala.binary.version` property > -- > > Key: FLINK-4561 > URL: https://issues.apache.org/jira/browse/FLINK-4561 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > > replace all the scala version(2.10) as a property `scala.binary.version` > defined in root pom properties. default scala version property is 2.10. > modify: > 1. dependency include scala version > 2. module defining include scala version > 3. scala version upgrade to 2.11.8 from 2.11.7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2459: [FLINK-4561] replace all the scala version as a `s...
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2459 [FLINK-4561] replace all the scala version as a `scala.binary.version` property Replace all the scala version(2.10) as a property `scala.binary.version` defined in root pom properties. default scala version property is 2.10. modify: 1. dependency include scala version 2. module defining include scala version 3. scala version upgrade to 2.11.8 from 2.11.7 Only modify pom file. The pull request references the related JIRA issue ("[FLINK-4561] replace all the scala version as a `scala.binary.version` property") You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink scala_version_property_replace Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2459.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2459 commit e8feaabcb4dc853c6e5b895c2351ea3d96351096 Author: shijinkui Date: 2016-09-02T03:14:07Z [FLINK-4561] replace all the scala version as a `scala.binary.version` property replace all the scala version(2.10) as a property `scala.binary.version` defined in root pom properties. default scala version property is 2.10. modify: 1. dependency include scala version 2. module defining include scala version 3. scala version upgrade to 2.11.8 from 2.11.7 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4561) replace all the scala version as a `scala.binary.version` property
[ https://issues.apache.org/jira/browse/FLINK-4561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shijinkui updated FLINK-4561: - Summary: replace all the scala version as a `scala.binary.version` property (was: replace all the scala version as a property) > replace all the scala version as a `scala.binary.version` property > -- > > Key: FLINK-4561 > URL: https://issues.apache.org/jira/browse/FLINK-4561 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > > replace all the scala version(2.10) as a property `scala.binary.version` > defined in root pom properties. default scala version property is 2.10. > modify: > 1. dependency include scala version > 2. module defining include scala version > 3. scala version upgrade to 2.11.8 from 2.11.7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4561) replace all the scala version as a property
shijinkui created FLINK-4561: Summary: replace all the scala version as a property Key: FLINK-4561 URL: https://issues.apache.org/jira/browse/FLINK-4561 Project: Flink Issue Type: Improvement Reporter: shijinkui replace all the scala version(2.10) as a property `scala.binary.version` defined in root pom properties. default scala version property is 2.10. modify: 1. dependency include scala version 2. module defining include scala version 3. scala version upgrade to 2.11.8 from 2.11.7 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4560) enforcer java version as 1.7
[ https://issues.apache.org/jira/browse/FLINK-4560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15457312#comment-15457312 ] ASF GitHub Bot commented on FLINK-4560: --- GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2458 [FLINK-4560] enforcer java version as 1.7 [FLINK-4560](https://issues.apache.org/jira/browse/FLINK-4560) enforcer java version as 1.7 1. maven-enforcer-plugin add java version enforce 2. maven-enforcer-plugin version upgrade to 1.4.1 explicit require java version Only modify the root pom file. The pull request references the related JIRA issue ("[FLINK-4560] enforcer java version as 1.7") You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink enforcer_java_version_7 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2458.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2458 commit d4fe8b21ea6680832328cc93b620973bee57ab03 Author: shijinkui Date: 2016-09-02T02:46:45Z \ [FLINK-4560] enforcer java version as 1.7 1. maven-enforcer-plugin add java version enforce 2. maven-enforcer-plugin version upgrade to 1.4.1 explicit require java version > enforcer java version as 1.7 > > > Key: FLINK-4560 > URL: https://issues.apache.org/jira/browse/FLINK-4560 > Project: Flink > Issue Type: Improvement >Reporter: shijinkui > > 1. maven-enforcer-plugin add java version enforce > 2. maven-enforcer-plugin version upgrade to 1.4.1 > explicit require java version -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2458: [FLINK-4560] enforcer java version as 1.7
GitHub user shijinkui opened a pull request: https://github.com/apache/flink/pull/2458 [FLINK-4560] enforcer java version as 1.7 [FLINK-4560](https://issues.apache.org/jira/browse/FLINK-4560) enforcer java version as 1.7 1. maven-enforcer-plugin add java version enforce 2. maven-enforcer-plugin version upgrade to 1.4.1 explicit require java version Only modify the root pom file. The pull request references the related JIRA issue ("[FLINK-4560] enforcer java version as 1.7") You can merge this pull request into a Git repository by running: $ git pull https://github.com/shijinkui/flink enforcer_java_version_7 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2458.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2458 commit d4fe8b21ea6680832328cc93b620973bee57ab03 Author: shijinkui Date: 2016-09-02T02:46:45Z \ [FLINK-4560] enforcer java version as 1.7 1. maven-enforcer-plugin add java version enforce 2. maven-enforcer-plugin version upgrade to 1.4.1 explicit require java version --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4560) enforcer java version as 1.7
shijinkui created FLINK-4560: Summary: enforcer java version as 1.7 Key: FLINK-4560 URL: https://issues.apache.org/jira/browse/FLINK-4560 Project: Flink Issue Type: Improvement Reporter: shijinkui 1. maven-enforcer-plugin add java version enforce 2. maven-enforcer-plugin version upgrade to 1.4.1 explicit require java version -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager
[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456481#comment-15456481 ] Scott Kidder commented on FLINK-4536: - On second thought, I'll leave the issue open and allow the Flink team to decide whether to update the version of the AWS SDK and KCL referenced in the Kinesis connector pom.xml. > Possible thread leak in Task Manager > > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Scott Kidder > > Running Flink release 1.1.1 commit 61bfb36 in the following configuration: > Job Manager > 2 x Task Manager (2 CPU cores on each Task Manager) > I've also updated the Kinesis source to use the latest AWS Java SDK, release > 1.11.29. > I've got a single Flink application using all 4 slots. It consumes from a > Kinesis stream configured with 2 shards. I've limited the Kinesis source to a > parallelism of 2 as a workaround for FLINK-4341. > Occasionally the Kinesis consumer fails because of provisioned-throughput > limits being hit. The application automatically restarts, and resumes > processing with the checkpoint stored on the Job Manager with no outward > indication of problems. > I recently enabled the StatsD metrics reporter in Flink and noticed that the > number of threads running on each Task Manager increases by about 20 threads > each time the application restarts. Over the course of a day the application > might hit provisioned-throughput limits 20 times or so (this is not fully > production yet, so hitting these limits is acceptable for now). But the > number of threads continues to grow unbounded with no increase in workload on > the Task Managers. > The following link includes charts for the overall Flink cluster performance > & Task Manager JVM threads over the course of 12 hours: > http://imgur.com/a/K59hz > Each decrease and subsequent spike in threads corresponds to the job being > restarted due to an AWS Kinesis source error. > Here are the logs from one of the Task Manager instances on startup: > {code} > 2016-08-30 14:52:50,438 WARN org.apache.hadoop.util.NativeCodeLoader > - Unable to load native-hadoop library for your platform... > using builtin-java classes where applicable > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC) > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > user: root > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: OpenJDK > 64-Bit Server VM - Oracle Corporation - 1.8/25.92-b14 > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 2048 MiBytes > 2016-08-30 14:52:50,541 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > /usr/lib/jvm/java-1.8-openjdk/jre > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.7.2 > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:+UseG1GC > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms2048M > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx2048M > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog.file=/usr/local/flink-1.1.1/log/flink--taskmanager-1-ip-10-55-2-218.log > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/usr/local/flink-1.1.1/conf/log4j.properties > 2016-08-30 14:52:50,543 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/usr/local/flink-1.1.1/conf/logback.xml > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2016-08-30 14:52:50,544 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2016-08
[jira] [Assigned] (FLINK-3719) WebInterface: Moving the barrier between graph and stats
[ https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-3719: - Assignee: Ivan Mushketyk > WebInterface: Moving the barrier between graph and stats > > > Key: FLINK-3719 > URL: https://issues.apache.org/jira/browse/FLINK-3719 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Niels Basjes >Assignee: Ivan Mushketyk > > It would be really useful if the separator between the graphical view of a > job topology at the top and the textual overview of the counters at the > bottom can be moved up/down. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3719) WebInterface: Moving the barrier between graph and stats
[ https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456480#comment-15456480 ] Ivan Mushketyk commented on FLINK-3719: --- I'll implement this. > WebInterface: Moving the barrier between graph and stats > > > Key: FLINK-3719 > URL: https://issues.apache.org/jira/browse/FLINK-3719 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Niels Basjes > > It would be really useful if the separator between the graphical view of a > job topology at the top and the textual overview of the counters at the > bottom can be moved up/down. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3030) Enhance Dashboard to show Execution Attempts
[ https://issues.apache.org/jira/browse/FLINK-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456474#comment-15456474 ] ASF GitHub Bot commented on FLINK-3030: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2448 @StephanEwen I've renamed the class as you suggested. Regarding the CSS change, I don't know why these changes are added. I have the required (according to README.md) `gulp` version: ``` > $ gulp -version [21:06:35] CLI version 3.9.1 [21:06:35] Local version 3.9.1 ``` I wonder if a `.styl` file was changed in one of the previous commits but it was not compiled to `.css`. > Enhance Dashboard to show Execution Attempts > > > Key: FLINK-3030 > URL: https://issues.apache.org/jira/browse/FLINK-3030 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 0.10.0 >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > Fix For: 1.0.0 > > > Currently, the web dashboard shows only the latest execution attempt. We > should make all execution attempts and their accumulators available for > inspection. > The REST monitoring API supports this, so it should be a change only to the > frontend part. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2448: [FLINK-3030][web frontend] Enhance dashboard to show exec...
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2448 @StephanEwen I've renamed the class as you suggested. Regarding the CSS change, I don't know why these changes are added. I have the required (according to README.md) `gulp` version: ``` > $ gulp -version [21:06:35] CLI version 3.9.1 [21:06:35] Local version 3.9.1 ``` I wonder if a `.styl` file was changed in one of the previous commits but it was not compiled to `.css`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-4536) Possible thread leak in Task Manager
[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456462#comment-15456462 ] Scott Kidder edited comment on FLINK-4536 at 9/1/16 8:04 PM: - Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With your help I've been able to fix the Kinesis client disconnects & the thread leaks. First, I implemented a {{close()}} function in the InfluxDBSink to ensure that the OkHttpClient used by the InfluxDB client is shutdown. The documentation for the OkHttp library suggests that this isn't necessary, but, hey, I'll do it anyways: http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html {code} @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.okHttpClient = new OkHttpClient(); this.influxDB = InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), System.getenv("INFLUXDB_USERNAME"), System.getenv("INFLUXDB_PASSWORD"), new OkClient(okHttpClient)); this.database = System.getenv("INFLUXDB_DATABASE"); this.retentionPolicy = System.getenv("INFLUXDB_RP"); // Flush every 2000 Points, at least every 100ms this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); } @Override public void close() throws Exception { if (this.okHttpClient != null) { this.okHttpClient.getDispatcher().getExecutorService().shutdown(); this.okHttpClient.getConnectionPool().evictAll(); } } {code} I continued to see Kinesis client disconnects and thread leaks, although they were happening at a slower rate. I updated the AWS SDK and KCL referenced by the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0: https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff Problem solved! There have been no disconnects for the last 2 hours; previously there were disconnects happening every 30 minutes or so. Also, the number of threads on the Task Managers has been constant. The following link includes charts covering the last 3 hours. The changes to the AWS SDK dependency was deployed at ~11:50 http://imgur.com/a/yDi4m Thank you again for your help. I'll mark this issue as resolved. was (Author: skidder): Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With your help I've been able to fix the Kinesis client disconnects & the thread leaks. First, I implemented a {code}code(){code} function in the InfluxDBSink to ensure that the {code}OkHttpClient{code} used by the InfluxDB client is shutdown. The documentation for the OkHttp library suggests that this isn't necessary, but, hey, I'll do it anyways: http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html {code} @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.okHttpClient = new OkHttpClient(); this.influxDB = InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), System.getenv("INFLUXDB_USERNAME"), System.getenv("INFLUXDB_PASSWORD"), new OkClient(okHttpClient)); this.database = System.getenv("INFLUXDB_DATABASE"); this.retentionPolicy = System.getenv("INFLUXDB_RP"); // Flush every 2000 Points, at least every 100ms this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); } @Override public void close() throws Exception { if (this.okHttpClient != null) { this.okHttpClient.getDispatcher().getExecutorService().shutdown(); this.okHttpClient.getConnectionPool().evictAll(); } } {code} I continued to see Kinesis client disconnects and thread leaks, although they were happening at a slower rate. I updated the AWS SDK and KCL referenced by the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0: https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff Problem solved! There have been no disconnects for the last 2 hours; previously there were disconnects happening every 30 minutes or so. Also, the number of threads on the Task Managers has been constant. The following link includes charts covering the last 3 hours. The changes to the AWS SDK dependency was deployed at ~11:50 http://imgur.com/a/yDi4m Thank you again for your help. I'll mark this issue as resolved. > Possible thread leak in Task Manager > > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 >
[jira] [Commented] (FLINK-4536) Possible thread leak in Task Manager
[ https://issues.apache.org/jira/browse/FLINK-4536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456462#comment-15456462 ] Scott Kidder commented on FLINK-4536: - Thank you so much [~tzulitai] and [~StephanEwen] for looking into this! With your help I've been able to fix the Kinesis client disconnects & the thread leaks. First, I implemented a {code}code(){code} function in the InfluxDBSink to ensure that the {code}OkHttpClient{code} used by the InfluxDB client is shutdown. The documentation for the OkHttp library suggests that this isn't necessary, but, hey, I'll do it anyways: http://square.github.io/okhttp/3.x/okhttp/okhttp3/OkHttpClient.html {code} @Override public void open(Configuration parameters) throws Exception { super.open(parameters); this.okHttpClient = new OkHttpClient(); this.influxDB = InfluxDBFactory.connect(System.getenv("INFLUXDB_URL"), System.getenv("INFLUXDB_USERNAME"), System.getenv("INFLUXDB_PASSWORD"), new OkClient(okHttpClient)); this.database = System.getenv("INFLUXDB_DATABASE"); this.retentionPolicy = System.getenv("INFLUXDB_RP"); // Flush every 2000 Points, at least every 100ms this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); } @Override public void close() throws Exception { if (this.okHttpClient != null) { this.okHttpClient.getDispatcher().getExecutorService().shutdown(); this.okHttpClient.getConnectionPool().evictAll(); } } {code} I continued to see Kinesis client disconnects and thread leaks, although they were happening at a slower rate. I updated the AWS SDK and KCL referenced by the Flink Kinesis Streaming Connector to use SDK 1.11.30 and KCL 1.7.0: https://github.com/apache/flink/commit/447df42c242cb4dd152c7f5727eb608b0a65d3ff Problem solved! There have been no disconnects for the last 2 hours; previously there were disconnects happening every 30 minutes or so. Also, the number of threads on the Task Managers has been constant. The following link includes charts covering the last 3 hours. The changes to the AWS SDK dependency was deployed at ~11:50 http://imgur.com/a/yDi4m Thank you again for your help. I'll mark this issue as resolved. > Possible thread leak in Task Manager > > > Key: FLINK-4536 > URL: https://issues.apache.org/jira/browse/FLINK-4536 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Scott Kidder > > Running Flink release 1.1.1 commit 61bfb36 in the following configuration: > Job Manager > 2 x Task Manager (2 CPU cores on each Task Manager) > I've also updated the Kinesis source to use the latest AWS Java SDK, release > 1.11.29. > I've got a single Flink application using all 4 slots. It consumes from a > Kinesis stream configured with 2 shards. I've limited the Kinesis source to a > parallelism of 2 as a workaround for FLINK-4341. > Occasionally the Kinesis consumer fails because of provisioned-throughput > limits being hit. The application automatically restarts, and resumes > processing with the checkpoint stored on the Job Manager with no outward > indication of problems. > I recently enabled the StatsD metrics reporter in Flink and noticed that the > number of threads running on each Task Manager increases by about 20 threads > each time the application restarts. Over the course of a day the application > might hit provisioned-throughput limits 20 times or so (this is not fully > production yet, so hitting these limits is acceptable for now). But the > number of threads continues to grow unbounded with no increase in workload on > the Task Managers. > The following link includes charts for the overall Flink cluster performance > & Task Manager JVM threads over the course of 12 hours: > http://imgur.com/a/K59hz > Each decrease and subsequent spike in threads corresponds to the job being > restarted due to an AWS Kinesis source error. > Here are the logs from one of the Task Manager instances on startup: > {code} > 2016-08-30 14:52:50,438 WARN org.apache.hadoop.util.NativeCodeLoader > - Unable to load native-hadoop library for your platform... > using builtin-java classes where applicable > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > > 2016-08-30 14:52:50,540 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.1.1, Rev:61bfb36, Date:09.08.2016 @ 12:09:08 UTC) > 2016-08-30 1
[jira] [Commented] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI
[ https://issues.apache.org/jira/browse/FLINK-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456392#comment-15456392 ] ASF GitHub Bot commented on FLINK-3680: --- GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2457 [FLINK-3680] Remove "(not set)" text in the Job Plan UI This PR removes the "(not set)" text in the web frontend. Currently it looks like this: https://issues.apache.org/jira/secure/attachment/12796006/Screen%20Shot%202016-03-29%20at%208.13.12%20PM.png Now it looks like: ![fixed labels](https://cloud.githubusercontent.com/assets/592286/18181550/815897c6-7083-11e6-8bbe-9825ac144c83.png) - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink improve-text Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2457.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2457 commit 67db4e7c03787645c45ef3bc95b45060892bb569 Author: Ivan Mushketyk Date: 2016-09-01T19:24:29Z [FLINK-3680] Remove "(not set)" text in the Job Plan UI > Remove or improve (not set) text in the Job Plan UI > --- > > Key: FLINK-3680 > URL: https://issues.apache.org/jira/browse/FLINK-3680 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Jamie Grier >Assignee: Ivan Mushketyk > Attachments: Screen Shot 2016-03-29 at 8.12.17 PM.png, Screen Shot > 2016-03-29 at 8.13.12 PM.png > > > When running streaming jobs the UI display (not set) in the UI in a few > different places. This is not the case for batch jobs. > To illustrate I've included screen shots of the UI for the batch and > streaming WordCount examples. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2457: [FLINK-3680] Remove "(not set)" text in the Job Pl...
GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2457 [FLINK-3680] Remove "(not set)" text in the Job Plan UI This PR removes the "(not set)" text in the web frontend. Currently it looks like this: https://issues.apache.org/jira/secure/attachment/12796006/Screen%20Shot%202016-03-29%20at%208.13.12%20PM.png Now it looks like: ![fixed labels](https://cloud.githubusercontent.com/assets/592286/18181550/815897c6-7083-11e6-8bbe-9825ac144c83.png) - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink improve-text Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2457.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2457 commit 67db4e7c03787645c45ef3bc95b45060892bb569 Author: Ivan Mushketyk Date: 2016-09-01T19:24:29Z [FLINK-3680] Remove "(not set)" text in the Job Plan UI --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456319#comment-15456319 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 > According to the design document, netty authentication is also part of this JIRA. Why was it not addressed? The netty layer is addressed as part of web layer authentication (T2-3 & T2-5 combined). Do you see a need to address this to some other part of the code as well? > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2425 > According to the design document, netty authentication is also part of this JIRA. Why was it not addressed? The netty layer is addressed as part of web layer authentication (T2-3 & T2-5 combined). Do you see a need to address this to some other part of the code as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456282#comment-15456282 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77230955 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -682,6 +774,91 @@ public static File getYarnPropertiesLocation(Configuration conf) { return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } + public static void persistAppState(String appId, String cookie) { + if(appId == null || cookie == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to persist cookie for the appID: {} in {} ", appId, path); + try { + File f = new File(path); + if(!f.exists()) { + f.createNewFile(); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (subNode.containsKey(cookieKey)) { + String errorMessage = "Secure Cookie is already found in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + subNode.addProperty(cookieKey, cookie); + config.save(); + LOG.debug("Persisted cookie for the appID: {}", appId); + } catch(Exception e) { + LOG.error("Exception occurred while persisting app state for app id: {}. Exception: {}", appId, e); + throw new RuntimeException(e); + } + } + + public static String getAppSecureCookie(String appId) { + if(appId == null) { + String errorMessage = "Application ID cannot be null"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + + String cookieFromFile; + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to fetch cookie for the appID: {} from {}", appId, path); + + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (!subNode.containsKey(cookieKey)) { + String errorMessage = "Could not find the app ID section in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + cookieFromFile = subNode.getString(cookieKey, ""); + if(cookieFromFile.length() == 0) { + String errorMessage = "Could not find cookie in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } catch(Exception e) { + LOG.error("Exception occurred while fetching cookie for app id: {} Exception: {}", appId, e); + throw new RuntimeException(e); + } + + LOG.debug("Found cookie for the appID: {}", appId); + return cookieFromFile; + } + + public static void removeAppState(String appId) { + if(appId == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to remove the reference for the appId: {} from {}", appId, path); + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.warn(errorMessage);
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77230955 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -682,6 +774,91 @@ public static File getYarnPropertiesLocation(Configuration conf) { return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } + public static void persistAppState(String appId, String cookie) { + if(appId == null || cookie == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to persist cookie for the appID: {} in {} ", appId, path); + try { + File f = new File(path); + if(!f.exists()) { + f.createNewFile(); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (subNode.containsKey(cookieKey)) { + String errorMessage = "Secure Cookie is already found in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + subNode.addProperty(cookieKey, cookie); + config.save(); + LOG.debug("Persisted cookie for the appID: {}", appId); + } catch(Exception e) { + LOG.error("Exception occurred while persisting app state for app id: {}. Exception: {}", appId, e); + throw new RuntimeException(e); + } + } + + public static String getAppSecureCookie(String appId) { + if(appId == null) { + String errorMessage = "Application ID cannot be null"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + + String cookieFromFile; + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to fetch cookie for the appID: {} from {}", appId, path); + + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (!subNode.containsKey(cookieKey)) { + String errorMessage = "Could not find the app ID section in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + cookieFromFile = subNode.getString(cookieKey, ""); + if(cookieFromFile.length() == 0) { + String errorMessage = "Could not find cookie in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } catch(Exception e) { + LOG.error("Exception occurred while fetching cookie for app id: {} Exception: {}", appId, e); + throw new RuntimeException(e); + } + + LOG.debug("Found cookie for the appID: {}", appId); + return cookieFromFile; + } + + public static void removeAppState(String appId) { + if(appId == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to remove the reference for the appId: {} from {}", appId, path); + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.warn(errorMessage); + return; + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + config.clearTree(appId); + config.save();
[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"
[ https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4559: --- Priority: Minor (was: Critical) > Kinesis Producer not setting credentials provider properly when > AWS_CREDENTIALS_PROVIDER is "AUTO" > -- > > Key: FLINK-4559 > URL: https://issues.apache.org/jira/browse/FLINK-4559 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: Tzu-Li (Gordon) Tai >Priority: Minor > Fix For: 1.1.3 > > > If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, > {{AWSUtils.getCredentialsProvider}} will return {{null}}, so > {{setCredentialsProvider}} should not be explicitly called on the internally > built {{KinesisProducerConfiguration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456245#comment-15456245 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77227679 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.ConfigConstants; +import org.junit.BeforeClass; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +public class BlobClientSecureTest extends BlobClientTest { + + /** +* Starts the BLOB server with secure cookie enabled configuration +*/ + @BeforeClass + public static void startServer() { + try { + config.setBoolean(ConfigConstants.SECURITY_ENABLED, true); + config.setString(ConfigConstants.SECURITY_COOKIE, "foo"); + BLOB_SERVER = new BlobServer(config); + } + catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } --- End diff -- Thanks. Will add the test case. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77227679 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.ConfigConstants; +import org.junit.BeforeClass; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +public class BlobClientSecureTest extends BlobClientTest { + + /** +* Starts the BLOB server with secure cookie enabled configuration +*/ + @BeforeClass + public static void startServer() { + try { + config.setBoolean(ConfigConstants.SECURITY_ENABLED, true); + config.setString(ConfigConstants.SECURITY_COOKIE, "foo"); + BLOB_SERVER = new BlobServer(config); + } + catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } --- End diff -- Thanks. Will add the test case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456217#comment-15456217 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77226096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -426,4 +440,11 @@ void unregisterConnection(BlobServerConnection conn) { } } + /* Secure cookie to authenticate */ + @Override + public String getSecureCookie() { return secureCookie; } + + /* Flag to indicate if service level authentication is enabled or not */ + public boolean isSecurityEnabled() { return secureCookie != null; } --- End diff -- We expect the secure cookie configuration to be available if security is enabled. A missing value will be reported as an error ahead of time. Are you expecting any other conditions to be met? Could you please elaborate? > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77226096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -426,4 +440,11 @@ void unregisterConnection(BlobServerConnection conn) { } } + /* Secure cookie to authenticate */ + @Override + public String getSecureCookie() { return secureCookie; } + + /* Flag to indicate if service level authentication is enabled or not */ + public boolean isSecurityEnabled() { return secureCookie != null; } --- End diff -- We expect the secure cookie configuration to be available if security is enabled. A missing value will be reported as an error ahead of time. Are you expecting any other conditions to be met? Could you please elaborate? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3679) DeserializationSchema should handle zero or more outputs for every input
[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456204#comment-15456204 ] Tzu-Li (Gordon) Tai commented on FLINK-3679: +1 to fix the issue, the proposed changes seem reasonable and heads towards a better API. The Kinesis consumer will need to adapt to this change as well, as it also accepts DeserializationSchema. > DeserializationSchema should handle zero or more outputs for every input > > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector >Reporter: Jamie Grier > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456157#comment-15456157 ] ASF GitHub Bot commented on FLINK-3930: --- Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77221958 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java --- @@ -99,7 +110,43 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { currentDecoder.destroy(); currentDecoder = null; } - + + if(secureCookie != null) { --- End diff -- The secure cookie value could be auto-populated (Yarn) or user-provided but will be persisted in the in-memory Flink configuration instance which is passed to the web layer during bootstrap. Should the user decide to torn security off, then we expect the services to be restarted to reflect the change? > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user vijikarthi commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77221958 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java --- @@ -99,7 +110,43 @@ public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) { currentDecoder.destroy(); currentDecoder = null; } - + + if(secureCookie != null) { --- End diff -- The secure cookie value could be auto-populated (Yarn) or user-provided but will be persisted in the in-memory Flink configuration instance which is passed to the web layer during bootstrap. Should the user decide to torn security off, then we expect the services to be restarted to reflect the change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs
[ https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456065#comment-15456065 ] ramkrishna.s.vasudevan commented on FLINK-3322: --- Reading the code I think I need to update the doc on how the memory pages can be allocated for each task. Will update the doc. Will do some tests before that. > MemoryManager creates too much GC pressure with iterative jobs > -- > > Key: FLINK-3322 > URL: https://issues.apache.org/jira/browse/FLINK-3322 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.0.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FLINK-3322.docx > > > When taskmanager.memory.preallocate is false (the default), released memory > segments are not added to a pool, but the GC is expected to take care of > them. This puts too much pressure on the GC with iterative jobs, where the > operators reallocate all memory at every superstep. > See the following discussion on the mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html > Reproducing the issue: > https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc > The class to start is malom.Solver. If you increase the memory given to the > JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. > (It will generate some lookuptables to /tmp on first run for a few minutes.) > (I think the slowdown might also depend somewhat on > taskmanager.memory.fraction, because more unused non-managed memory results > in rarer GCs.) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"
Tzu-Li (Gordon) Tai created FLINK-4559: -- Summary: Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO" Key: FLINK-4559 URL: https://issues.apache.org/jira/browse/FLINK-4559 Project: Flink Issue Type: Bug Components: Kinesis Connector Reporter: Tzu-Li (Gordon) Tai Priority: Critical If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, {{AWSUtils.getCredentialsProvider}} will return {{null}}, so {{setCredentialsProvider}} should not be explicitly called on the internally built {{KinesisProducerConfiguration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456048#comment-15456048 ] ASF GitHub Bot commented on FLINK-4458: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 Thanks for the review @StephanEwen. I agree that we've different style preferences concerning method parameters. I also agree that the one parameter per line is a little bit more verbose and requires more vertical scrolling. The best approach would probably be to reduce the number of method parameters by grouping them. > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 Thanks for the review @StephanEwen. I agree that we've different style preferences concerning method parameters. I also agree that the one parameter per line is a little bit more verbose and requires more vertical scrolling. The best approach would probably be to reduce the number of method parameters by grouping them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3947) Provide low level access to RocksDB state backend
[ https://issues.apache.org/jira/browse/FLINK-3947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15456040#comment-15456040 ] Elias Levy commented on FLINK-3947: --- The use case us maintaining a large set of items as state for a keyed stream where items in the set may be added or removed incrementally as new messages are processed. There are a number of problems with the current RocksDB state backend implementation: - It can handle aggregate state in a keyed stream that won't fit in memory as long as per key state fits in memory, but it can't handle per key state that won't fit in memory. - Even when per key state fits in memory, it is extremely inefficient handling state that consists of a large set or list of items, where a small number items are added/removed to the set/list each time a message is processed. The current implementation serializes the whole set/list as a single value, and thus deserializes/serializes it all every time a single value is added/removed. An example of such state is a LRU cache for values extracted from a stream, where during each processElement invocation elements may be added and removed from the keyed state. With low level access to RocksDB such an LRU could be easily implemented. Set items would be serialized one per KV in RocksDB. The key starting with the stream partition key, but also encoding the values timestamp. Expiration is just a matter of iterating over the key range, deleting expired keys, and stopping after observing a value that should not be expired. Alternatively, if the RocksDB TTL functionality were exposed, that could be used. Even if low level access to RocksDB is not provided, Flink needs a other implementations of state management for container data types, where the data is stored and accessed more efficiently that serializing/deserializing the whole data structure. > Provide low level access to RocksDB state backend > - > > Key: FLINK-3947 > URL: https://issues.apache.org/jira/browse/FLINK-3947 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.0.3 >Reporter: Elias Levy > > The current state API is limiting and some implementations are not as > efficient as they could be, particularly when working with large states. For > instance, a ListState is append only. You cannot remove values from the > list. And the RocksDBListState get() implementation reads all list values > from RocksDB instead of returning an Iterable that only reads values as > needed. > Furthermore, RocksDB is an ordered KV store, yet there is no ordered map > state API with an ability to iterate over the stored values in order. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"
[ https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4559: --- Affects Version/s: 1.1.2 > Kinesis Producer not setting credentials provider properly when > AWS_CREDENTIALS_PROVIDER is "AUTO" > -- > > Key: FLINK-4559 > URL: https://issues.apache.org/jira/browse/FLINK-4559 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.1.3 > > > If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, > {{AWSUtils.getCredentialsProvider}} will return {{null}}, so > {{setCredentialsProvider}} should not be explicitly called on the internally > built {{KinesisProducerConfiguration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"
[ https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4559: --- Fix Version/s: 1.1.3 > Kinesis Producer not setting credentials provider properly when > AWS_CREDENTIALS_PROVIDER is "AUTO" > -- > > Key: FLINK-4559 > URL: https://issues.apache.org/jira/browse/FLINK-4559 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.1.0, 1.1.1, 1.1.2 >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.1.3 > > > If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, > {{AWSUtils.getCredentialsProvider}} will return {{null}}, so > {{setCredentialsProvider}} should not be explicitly called on the internally > built {{KinesisProducerConfiguration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4559) Kinesis Producer not setting credentials provider properly when AWS_CREDENTIALS_PROVIDER is "AUTO"
[ https://issues.apache.org/jira/browse/FLINK-4559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-4559: --- Affects Version/s: 1.1.1 1.1.0 > Kinesis Producer not setting credentials provider properly when > AWS_CREDENTIALS_PROVIDER is "AUTO" > -- > > Key: FLINK-4559 > URL: https://issues.apache.org/jira/browse/FLINK-4559 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.1.0, 1.1.1 >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > If the {{AWS_CREDENTIAL_PROVIDER}} option is set to {{AUTO}}, > {{AWSUtils.getCredentialsProvider}} will return {{null}}, so > {{setCredentialsProvider}} should not be explicitly called on the internally > built {{KinesisProducerConfiguration}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by L...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2450#discussion_r77212855 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala --- @@ -69,7 +69,7 @@ abstract class FlinkMiniCluster( ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getByName("localhost").getHostAddress()) - val configuration = generateConfiguration(userConfiguration) + protected val _configuration = generateConfiguration(userConfiguration) --- End diff -- It's actually the original configuration which will usually have the job manager ipc port set to 0. Thus, in order to return a configuration which points to the current leader, we have to generate a new configuration via `configuration` which inserts the selected port. I'll rename the variable to `originalConfiguration`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455997#comment-15455997 ] ASF GitHub Bot commented on FLINK-4458: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2450#discussion_r77212855 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala --- @@ -69,7 +69,7 @@ abstract class FlinkMiniCluster( ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getByName("localhost").getHostAddress()) - val configuration = generateConfiguration(userConfiguration) + protected val _configuration = generateConfiguration(userConfiguration) --- End diff -- It's actually the original configuration which will usually have the job manager ipc port set to 0. Thus, in order to return a configuration which points to the current leader, we have to generate a new configuration via `configuration` which inserts the selected port. I'll rename the variable to `originalConfiguration`. > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2449 I've addressed your comments @StephanEwen. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455990#comment-15455990 ] ASF GitHub Bot commented on FLINK-4455: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2449 I've addressed your comments @StephanEwen. > Replace ActorGateways in NetworkEnvironment by interfaces > - > > Key: FLINK-4455 > URL: https://issues.apache.org/jira/browse/FLINK-4455 > Project: Flink > Issue Type: Improvement > Components: Network, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{NetworkEnvironment}} communicates with the outside world > ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the > dependency on actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement > these interfaces as part of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4558) Add support for synchronizing streams
[ https://issues.apache.org/jira/browse/FLINK-4558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455966#comment-15455966 ] Elias Levy commented on FLINK-4558: --- It should be noted that Flink already perform a similar function in the handling of snapshot barriers, where it will pause a stream into an operator with multiple streams upon receiving a barrier, so as two align the two streams at the barrier to generate a snapshot. > Add support for synchronizing streams > - > > Key: FLINK-4558 > URL: https://issues.apache.org/jira/browse/FLINK-4558 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Elias Levy > > As mentioned on the [mailing > list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html], > there are use cases that require synchronizing two streams on via their > times and where it is not practical to buffer all messages from one streams > while waiting for the other to synchronize. Flink should add functionality > to enable such use cases. > This could be implemented by modifying TwoInputStreamOperator so that calls > to processElement1 and processElement2 could return a value indicating that > the element can't yet be processed, having the framework then pause > processing for some time, potentially using exponential back off with a hard > maximum, and then allowing the back pressure system to do its work and pause > the stream. > Alternatively, an API could be added to explicitly pause/unpause a stream. > For ease of use either of these mechanism should be used to create a > SynchronizedTwoInputStreamOperator that end users can utilize by passing a > configurable time delta to use as a synchronization threshold. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4461) Ensure all the classes are tagged with suitable annotations
[ https://issues.apache.org/jira/browse/FLINK-4461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ramkrishna.s.vasudevan updated FLINK-4461: -- Attachment: FLINK_annotations.xlsx Just attaching a list of public classes and interfaces that has no annotations attached with it. May be worth to fix atleast the actual public classes which are missing annotations. > Ensure all the classes are tagged with suitable annotations > --- > > Key: FLINK-4461 > URL: https://issues.apache.org/jira/browse/FLINK-4461 > Project: Flink > Issue Type: Improvement >Reporter: ramkrishna.s.vasudevan >Assignee: ramkrishna.s.vasudevan > Attachments: FLINK_annotations.xlsx > > > Currently in Flink we have three annotations > Public > PublicEvolving > Internal. > But some of the classes though they are public they are not tagged. It may be > even advanced features but still tagging them could help the user to know > which are public facing and which are Internal API/interfaces. > I just ran a sample util in streaming-java package and I got these > {code} > class org.apache.flink.streaming.runtime.operators.CheckpointCommitter > class > org.apache.flink.streaming.api.functions.source.FileMonitoringFunction$WatchType > interface org.apache.flink.streaming.api.functions.TimestampExtractor > class > org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows > class org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet > class org.apache.flink.streaming.api.windowing.triggers.TriggerResult > class > org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor > class org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator > class > org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink$ExactlyOnceState > interface > org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks > class > org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows > interface > org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction > interface > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet$MergeFunction > class org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider > class > org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema > class org.apache.flink.streaming.api.functions.source.FileReadFunction > class > org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows > class org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask > class org.apache.flink.streaming.api.functions.source.FileMonitoringFunction > class org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput > class org.apache.flink.streaming.api.functions.IngestionTimeExtractor > class > org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction > interface org.apache.flink.streaming.api.functions.TimestampAssigner > class org.apache.flink.streaming.api.operators.StoppableStreamSource > class org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink > class > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction > class org.apache.flink.streaming.util.HDFSCopyToLocal > class > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator > class org.apache.flink.streaming.api.collector.selector.DirectedOutput > class org.apache.flink.streaming.runtime.tasks.TimeServiceProvider > class org.apache.flink.streaming.util.HDFSCopyFromLocal > class > org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows > {code} > These classes are simply not tagged. In the above example TimeStampAssigner > should fall in @Public tag I believe. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4558) Add support for synchronizing streams
Elias Levy created FLINK-4558: - Summary: Add support for synchronizing streams Key: FLINK-4558 URL: https://issues.apache.org/jira/browse/FLINK-4558 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.1.0 Reporter: Elias Levy As mentioned on the [mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html], there are use cases that require synchronizing two streams on via their times and where it is not practical to buffer all messages from one streams while waiting for the other to synchronize. Flink should add functionality to enable such use cases. This could be implemented by modifying TwoInputStreamOperator so that calls to processElement1 and processElement2 could return a value indicating that the element can't yet be processed, having the framework then pause processing for some time, potentially using exponential back off with a hard maximum, and then allowing the back pressure system to do its work and pause the stream. Alternatively, an API could be added to explicitly pause/unpause a stream. For ease of use either of these mechanism should be used to create a SynchronizedTwoInputStreamOperator that end users can utilize by passing a configurable time delta to use as a synchronization threshold. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2449 Thanks for the quick and thorough review @StephanEwen. - Concerning the `ExecutionContext` in the `TaskManager`: This is simply the `ExecutionContext` which was passed before to the `NetworkEnvironment` but now is no longer needed there. I agree that we should be careful with thread pool creations. At the moment, we create for each `JobManager` and `TaskManager` an additional execution context. The advantage of this approach is that we separate the actor execution from the execution of other components. Thus, it's not possible that another component starves the execution context of the actor and thus influences its responsiveness. But I agree that the few additional future callbacks shouldn't be a big burden for the `TaskManager` actor system. Will change it. - It is true that there is no notion of slots on the `TaskManager` right now. I've named it `SlotEnvironment` because with the Flip-6 refactorings we need to maintain multiple job manager connections and every task runs in a slot which is associated with a JobManager. Renaming it to `JobManagerConnection` should better reflect what it actually is. - Yes that is a good idea. Will try to attach the `ResultPartitionConsumableNotifier` to the `Task`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455878#comment-15455878 ] ASF GitHub Bot commented on FLINK-4455: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2449 Thanks for the quick and thorough review @StephanEwen. - Concerning the `ExecutionContext` in the `TaskManager`: This is simply the `ExecutionContext` which was passed before to the `NetworkEnvironment` but now is no longer needed there. I agree that we should be careful with thread pool creations. At the moment, we create for each `JobManager` and `TaskManager` an additional execution context. The advantage of this approach is that we separate the actor execution from the execution of other components. Thus, it's not possible that another component starves the execution context of the actor and thus influences its responsiveness. But I agree that the few additional future callbacks shouldn't be a big burden for the `TaskManager` actor system. Will change it. - It is true that there is no notion of slots on the `TaskManager` right now. I've named it `SlotEnvironment` because with the Flip-6 refactorings we need to maintain multiple job manager connections and every task runs in a slot which is associated with a JobManager. Renaming it to `JobManagerConnection` should better reflect what it actually is. - Yes that is a good idea. Will try to attach the `ResultPartitionConsumableNotifier` to the `Task`. > Replace ActorGateways in NetworkEnvironment by interfaces > - > > Key: FLINK-4455 > URL: https://issues.apache.org/jira/browse/FLINK-4455 > Project: Flink > Issue Type: Improvement > Components: Network, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{NetworkEnvironment}} communicates with the outside world > ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the > dependency on actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement > these interfaces as part of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4549) Test and document implicitly supported SQL functions
[ https://issues.apache.org/jira/browse/FLINK-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-4549: --- Assignee: Timo Walther > Test and document implicitly supported SQL functions > > > Key: FLINK-4549 > URL: https://issues.apache.org/jira/browse/FLINK-4549 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > Calcite supports many SQL functions by translating them into {{RexNode}} s. > However, SQL functions like {{NULLIF}}, {{OVERLAPS}} are neither tested nor > document although supported. > These functions should be tested and added to the documentation. We could > adopt parts from the Calcite documentation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4485) Finished jobs in yarn session fill /tmp filesystem
[ https://issues.apache.org/jira/browse/FLINK-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455755#comment-15455755 ] Niels Basjes commented on FLINK-4485: - I have tried to create a minimal application that reproduces the problem I see. # Get flink 1.1.1 scala 2.10 binary for Linux. # Manually update yarn-session.sh to the latest in master to fix the HBase classpath issue. # Make sure you have HBase running and configured properly (i.e. HBASE_CONF_DIR and HADOOP_CONF_DIR are setup correctly in your environment). # Create a table called {{test}} in HBase with at least 1 row in it. # Start {{./flink-1.1.1/bin/yarn-session.sh -n2 -s5 -d}} # Get this test project and build it: https://github.com/nielsbasjes/Reproduce-FLINK-4485 # Then run this jar file with something like {{./flink-1.1.1/bin/flink run target/FLINK-4485-1.0-SNAPSHOT.jar}} several times. # Now when you do on the Hadoop node running the jobmanager {{lsof | fgrep blob}} you should see the deleted files as shown before. This reproduction path works on my machine ... > Finished jobs in yarn session fill /tmp filesystem > -- > > Key: FLINK-4485 > URL: https://issues.apache.org/jira/browse/FLINK-4485 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.1.0 >Reporter: Niels Basjes >Priority: Blocker > > On a Yarn cluster I start a yarn-session with a few containers and task slots. > Then I fire a 'large' number of Flink batch jobs in sequence against this > yarn session. It is the exact same job (java code) yet it gets different > parameters. > In this scenario it is exporting HBase tables to files in HDFS and the > parameters are about which data from which tables and the name of the target > directory. > After running several dozen jobs the jobs submission started to fail and we > investigated. > We found that the cause was that on the Yarn node which was hosting the > jobmanager the /tmp file system was full (4GB was 100% full). > How ever the output of {{du -hcs /tmp}} showed only 200MB in use. > We found that a very large file (we guess it is the jar of the job) was put > in /tmp , used, deleted yet the file handle was not closed by the jobmanager. > As soon as we killed the jobmanager the disk space was freed. > The summary of the impact of this is that a yarn-session that receives enough > jobs brings down the Yarn node for all users. > See parts of the output we got from {{lsof}} below. > {code} > COMMAND PID USER FD TYPE DEVICE SIZE > NODE NAME > java 15034 nbasjes 550r REG 253,17 66219695 > 245 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0003 > (deleted) > java 15034 nbasjes 551r REG 253,17 66219695 > 252 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0007 > (deleted) > java 15034 nbasjes 552r REG 253,17 66219695 > 267 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0012 > (deleted) > java 15034 nbasjes 553r REG 253,17 66219695 > 250 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0005 > (deleted) > java 15034 nbasjes 554r REG 253,17 66219695 > 288 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0018 > (deleted) > java 15034 nbasjes 555r REG 253,17 66219695 > 298 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0025 > (deleted) > java 15034 nbasjes 557r REG 253,17 66219695 > 254 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0008 > (deleted) > java 15034 nbasjes 558r REG 253,17 66219695 > 292 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0019 > (deleted) > java 15034 nbasjes 559r REG 253,17 66219695 > 275 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0013 > (deleted) > java 15034 nbasjes 560r REG 253,17 66219695 > 159 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0002 > (deleted) > java 15034 nbasjes 562r REG 253,17 66219695 > 238 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0001 > (deleted) > java 15034 nbasjes 568r REG 253,17 66219695 > 246 > /tmp/blobStore-fbe9c4cf-1f85-48cb-aad9-180e8d4ec7ce/incoming/temp-0004 > (deleted) > java 15034 nbasjes 569r REG 253,17 66219695 > 255 > /tmp/blobStore-
[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455751#comment-15455751 ] ASF GitHub Bot commented on FLINK-4455: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77195218 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -18,130 +18,88 @@ package org.apache.flink.runtime.io.network; -import akka.dispatch.OnFailure; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; -import org.apache.flink.runtime.messages.TaskMessages.FailTask; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Network I/O components of each {@link TaskManager} instance. The network environment contains * the data structures that keep track of all intermediate results and all data exchanges. - * - * When initialized, the NetworkEnvironment will allocate the network buffer pool. - * All other components (netty, intermediate result managers, ...) are only created once the - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the - * TaskManager actor gets created and registers itself at the JobManager. */ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); private final Object lock = new Object(); - private final NetworkEnvironmentConfiguration configuration; - - private final FiniteDuration jobManagerTimeout; - private final NetworkBufferPool networkBufferPool; - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; - private ResultPartitionManager partitionManager; + private final ResultPartitionManager resultPartitionManager; - private TaskEventDispatcher taskEventDispatcher; - - private ResultPartitionConsumableNotifier partitionConsumableNotifier; - - private PartitionStateChecker partitionStateChecker; + private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link org.apache.flink.runtime.state.KvState} requests. */ - private KvStateServer kvStateServer; + private
[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77195218 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -18,130 +18,88 @@ package org.apache.flink.runtime.io.network; -import akka.dispatch.OnFailure; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; -import org.apache.flink.runtime.messages.TaskMessages.FailTask; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Network I/O components of each {@link TaskManager} instance. The network environment contains * the data structures that keep track of all intermediate results and all data exchanges. - * - * When initialized, the NetworkEnvironment will allocate the network buffer pool. - * All other components (netty, intermediate result managers, ...) are only created once the - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the - * TaskManager actor gets created and registers itself at the JobManager. */ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); private final Object lock = new Object(); - private final NetworkEnvironmentConfiguration configuration; - - private final FiniteDuration jobManagerTimeout; - private final NetworkBufferPool networkBufferPool; - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; - private ResultPartitionManager partitionManager; + private final ResultPartitionManager resultPartitionManager; - private TaskEventDispatcher taskEventDispatcher; - - private ResultPartitionConsumableNotifier partitionConsumableNotifier; - - private PartitionStateChecker partitionStateChecker; + private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link org.apache.flink.runtime.state.KvState} requests. */ - private KvStateServer kvStateServer; + private final KvStateServer kvStateServer; /** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */ - private KvStateRegistry kvStateRegistry; + private final KvStateRegistry kvStateRegistry; - private b
[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455743#comment-15455743 ] ASF GitHub Bot commented on FLINK-4455: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77194531 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -476,6 +252,29 @@ public void unregisterTask(Task task) { } } + public void start() throws IOException { + synchronized (lock) { + LOG.info("Starting the network environment and its components."); --- End diff -- Yes that's true. Will change it. > Replace ActorGateways in NetworkEnvironment by interfaces > - > > Key: FLINK-4455 > URL: https://issues.apache.org/jira/browse/FLINK-4455 > Project: Flink > Issue Type: Improvement > Components: Network, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{NetworkEnvironment}} communicates with the outside world > ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the > dependency on actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement > these interfaces as part of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455740#comment-15455740 ] ASF GitHub Bot commented on FLINK-4455: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77194470 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -18,130 +18,88 @@ package org.apache.flink.runtime.io.network; -import akka.dispatch.OnFailure; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; -import org.apache.flink.runtime.messages.TaskMessages.FailTask; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Network I/O components of each {@link TaskManager} instance. The network environment contains * the data structures that keep track of all intermediate results and all data exchanges. - * - * When initialized, the NetworkEnvironment will allocate the network buffer pool. - * All other components (netty, intermediate result managers, ...) are only created once the - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the - * TaskManager actor gets created and registers itself at the JobManager. */ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); private final Object lock = new Object(); - private final NetworkEnvironmentConfiguration configuration; - - private final FiniteDuration jobManagerTimeout; - private final NetworkBufferPool networkBufferPool; - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; - private ResultPartitionManager partitionManager; + private final ResultPartitionManager resultPartitionManager; - private TaskEventDispatcher taskEventDispatcher; - - private ResultPartitionConsumableNotifier partitionConsumableNotifier; - - private PartitionStateChecker partitionStateChecker; + private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link org.apache.flink.runtime.state.KvState} requests. */ - private KvStateServer kvStateServer; + private
[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77194470 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -18,130 +18,88 @@ package org.apache.flink.runtime.io.network; -import akka.dispatch.OnFailure; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; -import org.apache.flink.runtime.messages.TaskMessages.FailTask; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Network I/O components of each {@link TaskManager} instance. The network environment contains * the data structures that keep track of all intermediate results and all data exchanges. - * - * When initialized, the NetworkEnvironment will allocate the network buffer pool. - * All other components (netty, intermediate result managers, ...) are only created once the - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the - * TaskManager actor gets created and registers itself at the JobManager. */ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); private final Object lock = new Object(); - private final NetworkEnvironmentConfiguration configuration; - - private final FiniteDuration jobManagerTimeout; - private final NetworkBufferPool networkBufferPool; - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; - private ResultPartitionManager partitionManager; + private final ResultPartitionManager resultPartitionManager; - private TaskEventDispatcher taskEventDispatcher; - - private ResultPartitionConsumableNotifier partitionConsumableNotifier; - - private PartitionStateChecker partitionStateChecker; + private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link org.apache.flink.runtime.state.KvState} requests. */ - private KvStateServer kvStateServer; + private final KvStateServer kvStateServer; /** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */ - private KvStateRegistry kvStateRegistry; + private final KvStateRegistry kvStateRegistry; - private b
[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77194531 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -476,6 +252,29 @@ public void unregisterTask(Task task) { } } + public void start() throws IOException { + synchronized (lock) { + LOG.info("Starting the network environment and its components."); --- End diff -- Yes that's true. Will change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4459) Introduce SlotProvider for Scheduler
[ https://issues.apache.org/jira/browse/FLINK-4459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455704#comment-15455704 ] ASF GitHub Bot commented on FLINK-4459: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2424 I will try and rebase/merge this on top of #2447 and add this to the `flip-6` branch as soon as #2447 is approved. > Introduce SlotProvider for Scheduler > > > Key: FLINK-4459 > URL: https://issues.apache.org/jira/browse/FLINK-4459 > Project: Flink > Issue Type: Improvement > Components: Scheduler >Reporter: Till Rohrmann >Assignee: Kurt Young > > Currently the {{Scheduler}} maintains a queue of available instances which it > scans if it needs a new slot. If it finds a suitable instance (having free > slots available) it will allocate a slot from it. > This slot allocation logic can be factored out and be made available via a > {{SlotProvider}} interface. The {{SlotProvider}} has methods to allocate a > slot given a set of location preferences. Slots should be returned as > {{Futures}}, because in the future the slot allocation might happen > asynchronously (Flip-6). > In the first version, the {{SlotProvider}} implementation will simply > encapsulate the existing slot allocation logic extracted from the > {{Scheduler}}. When a slot is requested it will return a completed or failed > future since the allocation happens synchronously. > The refactoring will have the advantage to simplify the {{Scheduler}} class > and to pave the way for upcoming refactorings (Flip-6). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2424: [FLINK-4459][Scheduler] Introduce SlotProvider for Schedu...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2424 I will try and rebase/merge this on top of #2447 and add this to the `flip-6` branch as soon as #2447 is approved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2456 [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvironment Replaces the `ActorGateway` in `Task` and `RuntimeEnvironment` by interfaces to decouple these components from the actors. - Introduces a `TaskExecutionStateListener` interface for `TaskExecutionState` update messages - Replaces the job manager `ActorGateway` by `InputSplitProvider` and `CheckpointNotifier` - Replaces the task manager `ActorGateway` by `TaskManagerConnection` The implementations using the `ActorGateways` are - `InputSplitProvider` --> `TaskInputSplitProvider` - `TaskExecutionStateListener` --> `ActorGatewayTaskExecutionStateListener` - `CheckpointNotifier` --> `ActorGatewayCheckpointNotifier` - `TaskManagerConnection` --> `ActorGatewayTaskManagerConnection` You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink FLINK-4456 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2456.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2456 commit 578b2ef0bacc57f093150e4addd56b833ebbdf05 Author: Till Rohrmann Date: 2016-09-01T12:41:44Z [FLINK-4456] Introduce TaskExecutionStateListener for Task commit bd85b45e61160e5d86578da1e52c60ef1bde7c10 Author: Till Rohrmann Date: 2016-09-01T13:50:36Z Replace JobManagerGateway in Task by InputSplitProvider and CheckpointNotifier commit d665583ca03e6748187ada46ce30c39704b17fd8 Author: Till Rohrmann Date: 2016-09-01T14:36:31Z Replace the TaskManager ActorGateway by TaskManagerConnection in Task --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455674#comment-15455674 ] ASF GitHub Bot commented on FLINK-4456: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2456 [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvironment Replaces the `ActorGateway` in `Task` and `RuntimeEnvironment` by interfaces to decouple these components from the actors. - Introduces a `TaskExecutionStateListener` interface for `TaskExecutionState` update messages - Replaces the job manager `ActorGateway` by `InputSplitProvider` and `CheckpointNotifier` - Replaces the task manager `ActorGateway` by `TaskManagerConnection` The implementations using the `ActorGateways` are - `InputSplitProvider` --> `TaskInputSplitProvider` - `TaskExecutionStateListener` --> `ActorGatewayTaskExecutionStateListener` - `CheckpointNotifier` --> `ActorGatewayCheckpointNotifier` - `TaskManagerConnection` --> `ActorGatewayTaskManagerConnection` You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink FLINK-4456 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2456.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2456 commit 578b2ef0bacc57f093150e4addd56b833ebbdf05 Author: Till Rohrmann Date: 2016-09-01T12:41:44Z [FLINK-4456] Introduce TaskExecutionStateListener for Task commit bd85b45e61160e5d86578da1e52c60ef1bde7c10 Author: Till Rohrmann Date: 2016-09-01T13:50:36Z Replace JobManagerGateway in Task by InputSplitProvider and CheckpointNotifier commit d665583ca03e6748187ada46ce30c39704b17fd8 Author: Till Rohrmann Date: 2016-09-01T14:36:31Z Replace the TaskManager ActorGateway by TaskManagerConnection in Task > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455668#comment-15455668 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2425 According to the design document, netty authentication is also part of this JIRA. Why was it not addressed? > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2425 According to the design document, netty authentication is also part of this JIRA. Why was it not addressed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455664#comment-15455664 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2425 I'm done with my initial review. If you have a minute @mxm, it would be good if you could check the CliFrontend changes, to see if they fit the architecture well. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2425 I'm done with my initial review. If you have a minute @mxm, it would be good if you could check the CliFrontend changes, to see if they fit the architecture well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455662#comment-15455662 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2425 I manually tested the code. Taskmanagers are properly rejected on missmatching cookies, it works when they match. One thing I found was that the error reporting is not very good: ``` robert@robert-da ...k-1.2-SNAPSHOT-bin/flink-1.2-SNAPSHOT (git)-[FLINK-3930] % ./bin/flink run -k asdf ./examples/batch/WordCount.jar Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 Using address 127.0.0.1:6123 to connect to JobManager. JobManager web interface address http://127.0.0.1:8081 Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Submitting job with JobID: 0816b6a3427cdcfc1160655edc2cef09. Waiting for job completion. The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:380) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:367) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:322) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1003) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:292) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:377) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) ... 19 more Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin
[GitHub] flink issue #2425: FLINK-3930 Added shared secret based authorization for Fl...
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2425 I manually tested the code. Taskmanagers are properly rejected on missmatching cookies, it works when they match. One thing I found was that the error reporting is not very good: ``` robert@robert-da ...k-1.2-SNAPSHOT-bin/flink-1.2-SNAPSHOT (git)-[FLINK-3930] % ./bin/flink run -k asdf ./examples/batch/WordCount.jar Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 Using address 127.0.0.1:6123 to connect to JobManager. JobManager web interface address http://127.0.0.1:8081 Starting execution of program Executing WordCount example with default input data set. Use --input to specify file input. Printing result to stdout. Use --output to specify output path. Submitting job with JobID: 0816b6a3427cdcfc1160655edc2cef09. Waiting for job completion. The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:92) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:380) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:367) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:896) at org.apache.flink.api.java.DataSet.collect(DataSet.java:410) at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:92) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:322) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1003) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:292) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:377) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) ... 19 more Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:207) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:90) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:70) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` But I think that this is akka's fault, not ours. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project do
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77181460 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -597,6 +610,11 @@ public static ContainerLaunchContext createTaskManagerContext( containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername); + final String secureCookie = ENV.get(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE); + if(secureCookie != null) { + containerEnv.put(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE, secureCookie); --- End diff -- The problem here is that the secure cookie will be put into the environment of the TaskManager JVM, so it'll be quite easy to just read the environment variables (not sure if that is an issue). Another issue is that YARN is by default launching processes by creating a temporary bash file, with all the environment variables and the JVM invocation. So the secure cookie will be written into some tmp directory on YARN. I wonder if there's some infrastructure in YARN to transfer the tokens in a secure way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455536#comment-15455536 ] ASF GitHub Bot commented on FLINK-4458: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2450 I think these changes are good. As a very personal and biased statement: I think the code overdoes it a bit with the "every parameter on a new line" policy. We have beautiful wide screens and I get sore fingers from scrolling up and down again to figure out enough context of a code passage ;-) Also, the style breaks the "visual" separation between different parts of statement, like "members of a return tuple" vs. "function parameters", or "clauses of an if statement" and "body of the if statement". I know the idea of that pattern was to improve code readability, but I am getting the feeling this is approaching a "verschlimmbesserung" (an improvement that makes things worse). > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455534#comment-15455534 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77182333 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -682,6 +774,91 @@ public static File getYarnPropertiesLocation(Configuration conf) { return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } + public static void persistAppState(String appId, String cookie) { + if(appId == null || cookie == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to persist cookie for the appID: {} in {} ", appId, path); + try { + File f = new File(path); + if(!f.exists()) { + f.createNewFile(); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (subNode.containsKey(cookieKey)) { + String errorMessage = "Secure Cookie is already found in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + subNode.addProperty(cookieKey, cookie); + config.save(); + LOG.debug("Persisted cookie for the appID: {}", appId); + } catch(Exception e) { + LOG.error("Exception occurred while persisting app state for app id: {}. Exception: {}", appId, e); + throw new RuntimeException(e); + } + } + + public static String getAppSecureCookie(String appId) { + if(appId == null) { + String errorMessage = "Application ID cannot be null"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + + String cookieFromFile; + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to fetch cookie for the appID: {} from {}", appId, path); + + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (!subNode.containsKey(cookieKey)) { + String errorMessage = "Could not find the app ID section in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + cookieFromFile = subNode.getString(cookieKey, ""); + if(cookieFromFile.length() == 0) { + String errorMessage = "Could not find cookie in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } catch(Exception e) { + LOG.error("Exception occurred while fetching cookie for app id: {} Exception: {}", appId, e); + throw new RuntimeException(e); + } + + LOG.debug("Found cookie for the appID: {}", appId); + return cookieFromFile; + } + + public static void removeAppState(String appId) { + if(appId == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to remove the reference for the appId: {} from {}", appId, path); + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.warn(errorMessage); +
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77182333 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -682,6 +774,91 @@ public static File getYarnPropertiesLocation(Configuration conf) { return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } + public static void persistAppState(String appId, String cookie) { + if(appId == null || cookie == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to persist cookie for the appID: {} in {} ", appId, path); + try { + File f = new File(path); + if(!f.exists()) { + f.createNewFile(); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (subNode.containsKey(cookieKey)) { + String errorMessage = "Secure Cookie is already found in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + subNode.addProperty(cookieKey, cookie); + config.save(); + LOG.debug("Persisted cookie for the appID: {}", appId); + } catch(Exception e) { + LOG.error("Exception occurred while persisting app state for app id: {}. Exception: {}", appId, e); + throw new RuntimeException(e); + } + } + + public static String getAppSecureCookie(String appId) { + if(appId == null) { + String errorMessage = "Application ID cannot be null"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + + String cookieFromFile; + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to fetch cookie for the appID: {} from {}", appId, path); + + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + SubnodeConfiguration subNode = config.getSection(appId); + if (!subNode.containsKey(cookieKey)) { + String errorMessage = "Could not find the app ID section in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + cookieFromFile = subNode.getString(cookieKey, ""); + if(cookieFromFile.length() == 0) { + String errorMessage = "Could not find cookie in "+ path + " for the appID: "+ appId; + LOG.error(errorMessage); + throw new RuntimeException(errorMessage); + } + } catch(Exception e) { + LOG.error("Exception occurred while fetching cookie for app id: {} Exception: {}", appId, e); + throw new RuntimeException(e); + } + + LOG.debug("Found cookie for the appID: {}", appId); + return cookieFromFile; + } + + public static void removeAppState(String appId) { + if(appId == null) { return; } + String path = System.getProperty("user.home") + File.separator + fileName; + LOG.debug("Going to remove the reference for the appId: {} from {}", appId, path); + try { + File f = new File(path); + if (!f.exists()) { + String errorMessage = "Could not find the file: " + path + " in user home directory"; + LOG.warn(errorMessage); + return; + } + HierarchicalINIConfiguration config = new HierarchicalINIConfiguration(path); + config.clearTree(appId); + config.save(); +
[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2450 I think these changes are good. As a very personal and biased statement: I think the code overdoes it a bit with the "every parameter on a new line" policy. We have beautiful wide screens and I get sore fingers from scrolling up and down again to figure out enough context of a code passage ;-) Also, the style breaks the "visual" separation between different parts of statement, like "members of a return tuple" vs. "function parameters", or "clauses of an if statement" and "body of the if statement". I know the idea of that pattern was to improve code readability, but I am getting the feeling this is approaching a "verschlimmbesserung" (an improvement that makes things worse). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455531#comment-15455531 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77182101 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -682,6 +774,91 @@ public static File getYarnPropertiesLocation(Configuration conf) { return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } + public static void persistAppState(String appId, String cookie) { + if(appId == null || cookie == null) { return; } --- End diff -- please put `return` into a new line. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77182101 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -682,6 +774,91 @@ public static File getYarnPropertiesLocation(Configuration conf) { return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser); } + public static void persistAppState(String appId, String cookie) { + if(appId == null || cookie == null) { return; } --- End diff -- please put `return` into a new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455524#comment-15455524 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77181663 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -439,8 +450,8 @@ public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean read case "quit": case "stop": yarnCluster.shutdownCluster(); + if (yarnCluster.hasBeenShutdown()) { removeAppState(applicationId); } --- End diff -- This is not consistent with the style in the rest of the Flink codebase. Can you put the "remoteAppState()" into a separate line? > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77181663 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -439,8 +450,8 @@ public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean read case "quit": case "stop": yarnCluster.shutdownCluster(); + if (yarnCluster.hasBeenShutdown()) { removeAppState(applicationId); } --- End diff -- This is not consistent with the style in the rest of the Flink codebase. Can you put the "remoteAppState()" into a separate line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455520#comment-15455520 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77181460 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -597,6 +610,11 @@ public static ContainerLaunchContext createTaskManagerContext( containerEnv.put(YarnConfigKeys.ENV_CLIENT_USERNAME, yarnClientUsername); + final String secureCookie = ENV.get(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE); + if(secureCookie != null) { + containerEnv.put(YarnConfigKeys.ENV_SECURE_AUTH_COOKIE, secureCookie); --- End diff -- The problem here is that the secure cookie will be put into the environment of the TaskManager JVM, so it'll be quite easy to just read the environment variables (not sure if that is an issue). Another issue is that YARN is by default launching processes by creating a temporary bash file, with all the environment variables and the JVM invocation. So the secure cookie will be written into some tmp directory on YARN. I wonder if there's some infrastructure in YARN to transfer the tokens in a secure way. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455509#comment-15455509 ] ASF GitHub Bot commented on FLINK-4458: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2450#discussion_r77180730 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala --- @@ -69,7 +69,7 @@ abstract class FlinkMiniCluster( ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getByName("localhost").getHostAddress()) - val configuration = generateConfiguration(userConfiguration) + protected val _configuration = generateConfiguration(userConfiguration) --- End diff -- Why the underscore here? > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by L...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2450#discussion_r77180730 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala --- @@ -69,7 +69,7 @@ abstract class FlinkMiniCluster( ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, InetAddress.getByName("localhost").getHostAddress()) - val configuration = generateConfiguration(userConfiguration) + protected val _configuration = generateConfiguration(userConfiguration) --- End diff -- Why the underscore here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455506#comment-15455506 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77180355 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.ConfigConstants; +import org.junit.BeforeClass; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +public class BlobClientSecureTest extends BlobClientTest { + + /** +* Starts the BLOB server with secure cookie enabled configuration +*/ + @BeforeClass + public static void startServer() { + try { + config.setBoolean(ConfigConstants.SECURITY_ENABLED, true); + config.setString(ConfigConstants.SECURITY_COOKIE, "foo"); + BLOB_SERVER = new BlobServer(config); + } + catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } --- End diff -- Maybe it makes sense to add a test ensuring that clients with a wrong cookie are rejected: ``` @Test(expected = IOException.class) public void testInvalidSecurityCookie() throws IOException { BlobClient client = null; try { byte[] testBuffer = createTestBuffer(); MessageDigest md = BlobUtils.createMessageDigest(); md.update(testBuffer); InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); client = new BlobClient(serverAddress, "different"); // Store some data client.put(testBuffer); } finally { if (client != null) { try { client.close(); } catch (Throwable t) {} } } } ``` > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77180355 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.ConfigConstants; +import org.junit.BeforeClass; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +public class BlobClientSecureTest extends BlobClientTest { + + /** +* Starts the BLOB server with secure cookie enabled configuration +*/ + @BeforeClass + public static void startServer() { + try { + config.setBoolean(ConfigConstants.SECURITY_ENABLED, true); + config.setString(ConfigConstants.SECURITY_COOKIE, "foo"); + BLOB_SERVER = new BlobServer(config); + } + catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } --- End diff -- Maybe it makes sense to add a test ensuring that clients with a wrong cookie are rejected: ``` @Test(expected = IOException.class) public void testInvalidSecurityCookie() throws IOException { BlobClient client = null; try { byte[] testBuffer = createTestBuffer(); MessageDigest md = BlobUtils.createMessageDigest(); md.update(testBuffer); InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); client = new BlobClient(serverAddress, "different"); // Store some data client.put(testBuffer); } finally { if (client != null) { try { client.close(); } catch (Throwable t) {} } } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4510) Always create CheckpointCoordinator
[ https://issues.apache.org/jira/browse/FLINK-4510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455491#comment-15455491 ] ASF GitHub Bot commented on FLINK-4510: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2453 Thanks for looking into this. I wonder if we can do this simpler, without changes to the CheckpointCoordinator. The only thing that really needs to change is that without periodic checkpoints, the `startCheckpointScheduler()` method is not called, which means that the JobStatusListener that "CheckpointActivatorDeactivator" should not be created and started. > Always create CheckpointCoordinator > --- > > Key: FLINK-4510 > URL: https://issues.apache.org/jira/browse/FLINK-4510 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Jark Wu > > The checkpoint coordinator is only created if a checkpointing interval is > configured. This means that no savepoints can be triggered if there is no > checkpointing interval specified. > Instead we should always create it and allow an interval of 0 for disabled > periodic checkpoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2453: [FLINK-4510] [checkpoint] Always create CheckpointCoordin...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2453 Thanks for looking into this. I wonder if we can do this simpler, without changes to the CheckpointCoordinator. The only thing that really needs to change is that without periodic checkpoints, the `startCheckpointScheduler()` method is not called, which means that the JobStatusListener that "CheckpointActivatorDeactivator" should not be created and started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77178647 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.ConfigConstants; +import org.junit.BeforeClass; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +public class BlobClientSecureTest extends BlobClientTest { --- End diff -- Why are tests like the `testContentAddressableBuffer()` not failing, even if they pass a `null` security cookie? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455477#comment-15455477 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77178647 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSecureTest.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.configuration.ConfigConstants; +import org.junit.BeforeClass; + +import java.io.IOException; + +import static org.junit.Assert.fail; + +public class BlobClientSecureTest extends BlobClientTest { --- End diff -- Why are tests like the `testContentAddressableBuffer()` not failing, even if they pass a `null` security cookie? > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4557) Table API Stream Aggregations
Timo Walther created FLINK-4557: --- Summary: Table API Stream Aggregations Key: FLINK-4557 URL: https://issues.apache.org/jira/browse/FLINK-4557 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther The Table API is a declarative API to define queries on static and streaming tables. So far, only projection, selection, and union are supported operations on streaming tables. This issue and the corresponding FLIP proposes to add support for different types of aggregations on top of streaming tables. In particular, we seek to support: *Group-window aggregates*, i.e., aggregates which are computed for a group of elements. A (time or row-count) window is required to bound the infinite input stream into a finite group. *Row-window aggregates*, i.e., aggregates which are computed for each row, based on a window (range) of preceding and succeeding rows. Each type of aggregate shall be supported on keyed/grouped or non-keyed/grouped data streams for streaming tables as well as batch tables. Since time-windowed aggregates will be the first operation that require the definition of time, we also need to discuss how the Table API handles time characteristics, timestamps, and watermarks. The FLIP can be found here: https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455463#comment-15455463 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77177744 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -2041,6 +2044,18 @@ object JobManager { null } +val secureAuth = configuration.getBoolean(ConfigConstants.SECURITY_ENABLED, + ConfigConstants.DEFAULT_SECURITY_ENABLED) +if(secureAuth == true) { + val secureCookie = configuration.getString(ConfigConstants.SECURITY_COOKIE, null) + if(secureCookie == null) { +val message = "Missing " + ConfigConstants.SECURITY_COOKIE + + " configuration in Flink configuration file" +LOG.error(message) +throw new RuntimeException(message) + } +} --- End diff -- As I said before, this is duplicate code that can be encapsulated into a `SecurityUtils` static method. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455465#comment-15455465 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77177808 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1470,6 +1470,18 @@ object TaskManager { null } +val secureAuth = configuration.getBoolean(ConfigConstants.SECURITY_ENABLED, + ConfigConstants.DEFAULT_SECURITY_ENABLED) +if(secureAuth == true) { + val secureCookie = configuration.getString(ConfigConstants.SECURITY_COOKIE, null) + if(secureCookie == null) { +val message = "Missing " + ConfigConstants.SECURITY_COOKIE + + " configuration in Flink configuration file" +LOG.error(message) +throw new RuntimeException(message) + } +} + --- End diff -- Duplicate code > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77177808 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -1470,6 +1470,18 @@ object TaskManager { null } +val secureAuth = configuration.getBoolean(ConfigConstants.SECURITY_ENABLED, + ConfigConstants.DEFAULT_SECURITY_ENABLED) +if(secureAuth == true) { + val secureCookie = configuration.getString(ConfigConstants.SECURITY_COOKIE, null) + if(secureCookie == null) { +val message = "Missing " + ConfigConstants.SECURITY_COOKIE + + " configuration in Flink configuration file" +LOG.error(message) +throw new RuntimeException(message) + } +} + --- End diff -- Duplicate code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77177744 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -2041,6 +2044,18 @@ object JobManager { null } +val secureAuth = configuration.getBoolean(ConfigConstants.SECURITY_ENABLED, + ConfigConstants.DEFAULT_SECURITY_ENABLED) +if(secureAuth == true) { + val secureCookie = configuration.getString(ConfigConstants.SECURITY_COOKIE, null) + if(secureCookie == null) { +val message = "Missing " + ConfigConstants.SECURITY_COOKIE + + " configuration in Flink configuration file" +LOG.error(message) +throw new RuntimeException(message) + } +} --- End diff -- As I said before, this is duplicate code that can be encapsulated into a `SecurityUtils` static method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455428#comment-15455428 ] ASF GitHub Bot commented on FLINK-4455: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77175883 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -18,130 +18,88 @@ package org.apache.flink.runtime.io.network; -import akka.dispatch.OnFailure; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; -import org.apache.flink.runtime.messages.TaskMessages.FailTask; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Network I/O components of each {@link TaskManager} instance. The network environment contains * the data structures that keep track of all intermediate results and all data exchanges. - * - * When initialized, the NetworkEnvironment will allocate the network buffer pool. - * All other components (netty, intermediate result managers, ...) are only created once the - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the - * TaskManager actor gets created and registers itself at the JobManager. */ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); private final Object lock = new Object(); - private final NetworkEnvironmentConfiguration configuration; - - private final FiniteDuration jobManagerTimeout; - private final NetworkBufferPool networkBufferPool; - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; - private ResultPartitionManager partitionManager; + private final ResultPartitionManager resultPartitionManager; - private TaskEventDispatcher taskEventDispatcher; - - private ResultPartitionConsumableNotifier partitionConsumableNotifier; - - private PartitionStateChecker partitionStateChecker; + private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link org.apache.flink.runtime.state.KvState} requests. */ - private KvStateServer kvStateServer; + private
[GitHub] flink issue #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnviro...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2449 +1 for moving the `PartitionStateChecker` and `ResultPartitionConsumableNotifier` out of the `NetworkEnvironment`. Few questions and comments: - Do we need an extra ExecutorService in the TaskManager? I have been digging through a bunch of thread dumps over time and there are already many threads and pools already. I would really like to avoid having yet another Thread pool (creating thread pools should be an extremely careful decision). The Akka thread pool executor is quite over-provisioned for the few actors we actually use. I think it is perfectly feasible to use that one for the few extra futures introduced here. In any case, if not reusing the Akka executor pool, then the thread pool needs to be shut down in the TaskManager runner. Otherwise it creates a leak when running successive local Flink jobs. - I am a bit consumed about the `SlotEnvironment`. Maybe it is mainly the name, but what does it have to do with the slots? Is it not more like a network-messages specific *JobManager Connection*? - The `ResultPartitionConsumableNotifier` could be per `Task` - that way, future multi-JobManager assiciations would work seamlessly and it could directly call `fail(...)` on the Task without having to go through the `TaskManager`. It could probably leave the TaskManager out of the picture completely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4455) Replace ActorGateways in NetworkEnvironment by interfaces
[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455431#comment-15455431 ] ASF GitHub Bot commented on FLINK-4455: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2449 +1 for moving the `PartitionStateChecker` and `ResultPartitionConsumableNotifier` out of the `NetworkEnvironment`. Few questions and comments: - Do we need an extra ExecutorService in the TaskManager? I have been digging through a bunch of thread dumps over time and there are already many threads and pools already. I would really like to avoid having yet another Thread pool (creating thread pools should be an extremely careful decision). The Akka thread pool executor is quite over-provisioned for the few actors we actually use. I think it is perfectly feasible to use that one for the few extra futures introduced here. In any case, if not reusing the Akka executor pool, then the thread pool needs to be shut down in the TaskManager runner. Otherwise it creates a leak when running successive local Flink jobs. - I am a bit consumed about the `SlotEnvironment`. Maybe it is mainly the name, but what does it have to do with the slots? Is it not more like a network-messages specific *JobManager Connection*? - The `ResultPartitionConsumableNotifier` could be per `Task` - that way, future multi-JobManager assiciations would work seamlessly and it could directly call `fail(...)` on the Task without having to go through the `TaskManager`. It could probably leave the TaskManager out of the picture completely. > Replace ActorGateways in NetworkEnvironment by interfaces > - > > Key: FLINK-4455 > URL: https://issues.apache.org/jira/browse/FLINK-4455 > Project: Flink > Issue Type: Improvement > Components: Network, TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{NetworkEnvironment}} communicates with the outside world > ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the > dependency on actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement > these interfaces as part of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2449: [FLINK-4455] [FLINK-4424] [networkenv] Make Networ...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77175883 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -18,130 +18,88 @@ package org.apache.flink.runtime.io.network; -import akka.dispatch.OnFailure; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; -import org.apache.flink.runtime.messages.TaskMessages.FailTask; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Network I/O components of each {@link TaskManager} instance. The network environment contains * the data structures that keep track of all intermediate results and all data exchanges. - * - * When initialized, the NetworkEnvironment will allocate the network buffer pool. - * All other components (netty, intermediate result managers, ...) are only created once the - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the - * TaskManager actor gets created and registers itself at the JobManager. */ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); private final Object lock = new Object(); - private final NetworkEnvironmentConfiguration configuration; - - private final FiniteDuration jobManagerTimeout; - private final NetworkBufferPool networkBufferPool; - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; - private ResultPartitionManager partitionManager; + private final ResultPartitionManager resultPartitionManager; - private TaskEventDispatcher taskEventDispatcher; - - private ResultPartitionConsumableNotifier partitionConsumableNotifier; - - private PartitionStateChecker partitionStateChecker; + private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link org.apache.flink.runtime.state.KvState} requests. */ - private KvStateServer kvStateServer; + private final KvStateServer kvStateServer; /** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */ - private KvStateRegistry kvStateRegistry; + private final KvStateRegistry kvStateRegistry; - private bo
[jira] [Commented] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455423#comment-15455423 ] ASF GitHub Bot commented on FLINK-3930: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77175545 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -530,6 +529,7 @@ public void uploadRequiredJarFiles(InetSocketAddress serverAddress) throws IOExc } } } + */ --- End diff -- I think its better to completely delete the method instead of commenting it out. > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature > Components: Security >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2425: FLINK-3930 Added shared secret based authorization...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2425#discussion_r77175545 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -530,6 +529,7 @@ public void uploadRequiredJarFiles(InetSocketAddress serverAddress) throws IOExc } } } + */ --- End diff -- I think its better to completely delete the method instead of commenting it out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---