[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931953#comment-15931953 ] Geoffrey Mon commented on FLINK-5183: - Thanks for your help! > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Assignee: Geoffrey Mon >Priority: Minor > Fix For: 1.3.0 > > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931933#comment-15931933 ] ASF GitHub Bot commented on FLINK-5183: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3232 > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Assignee: Geoffrey Mon >Priority: Minor > Fix For: 1.3.0 > > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931633#comment-15931633 ] ASF GitHub Bot commented on FLINK-5183: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3232 I will now try this out and merge it if it workds. > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931540#comment-15931540 ] ASF GitHub Bot commented on FLINK-5183: --- Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 Resolved merge conflicts > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15870581#comment-15870581 ] ASF GitHub Bot commented on FLINK-5183: --- Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 > It may have worked with a smaller file, but there may be issues with heavier jobs. How silly of me. This problem had nothing to do with this pull request, with YARN, with issues in Flink, or with the size of the input file at all. I was using `ExecutionEnvironment.from_elements` to generate a large sequence of indexed zeroes to fill in the gaps of another indexed DataSet with zeroes. However, when I was using large input files, I set larger parameters and generated larger zero sequences. Because I was using `from_elements`, the client needed to send all of those values (lots and lots of zeroes) to the runtime, which was very time-consuming and caused the timeout. I have replaced this with a `generate_sequence` call and a map function, which does not require sending lots and lots of values from the client to the runtime, and the job (and this pull request) seem to work just fine. (change in question: https://github.com/quinngroup/pyflink-r1dl/commit/00a16d564bfad21fc1f4958677ada0a95fa9f088) > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859800#comment-15859800 ] ASF GitHub Bot commented on FLINK-5183: --- Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 It may have worked with a smaller file, but there may be issues with heavier jobs. When I ran a more computationally intensive and time consuming job, the first job of the Python file ran successfully. The second job of the file was then submitted: ``` 02/09/2017 16:39:43 DataSink (CsvSink)(4/5) switched to FINISHED 02/09/2017 16:39:43 Job execution switched to status FINISHED. 2017-02-09 16:40:26,470 INFO org.apache.flink.yarn.YarnClusterClient - Waiting until all TaskManagers have connected Waiting until all TaskManagers have connected 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient - TaskManager status (5/5) TaskManager status (5/5) 2017-02-09 16:40:26,476 INFO org.apache.flink.yarn.YarnClusterClient - All TaskManagers are connected All TaskManagers are connected 2017-02-09 16:40:26,480 INFO org.apache.flink.yarn.YarnClusterClient - Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion. Submitting job with JobID: b226f5f18a78bc386bd1b1b6d30515ea. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@.ec2.internal:35598/user/jobmanager#68430682] ``` However, Flink does not receive or respond to this new job. Instead, the client terminates with a timeout error: ``` Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission. at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:239) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) ``` I tried setting `akka.client.timeout` to 20 minutes, but Flink is still not receiving the second job. I suspect this may be an issue with this patch. > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859496#comment-15859496 ] ASF GitHub Bot commented on FLINK-5183: --- Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 In the process of getting a more complex job to run, I've tested a basic multi-job file (seen below) on an Amazon EMR YARN cluster and it works successfully. ``` from flink.plan.Environment import get_environment from flink.plan.Constants import INT, STRING, WriteMode from flink.functions.GroupReduceFunction \ import GroupReduceFunction from flink.functions.Aggregation import Sum import sys if __name__ == "__main__": output_file = 'hdfs:/tmp/out.txt' output_file2 = 'hdfs:/tmp/out2.txt' env = get_environment() data = env.from_elements((0,1),(1,2),(2,3),(2,10)) data \ .group_by(0) \ .aggregate(Sum, 1) \ .write_csv(output_file, write_mode=WriteMode.OVERWRITE) env.execute() env2 = get_environment() data2 = env2.read_csv(output_file, (INT, INT)) data2 \ .first(2) \ .write_text(output_file2, write_mode=WriteMode.OVERWRITE) env2.execute() ``` > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15854045#comment-15854045 ] ASF GitHub Bot commented on FLINK-5183: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/3232 Yes, please continue to comment after pushing changes :) I'll have to try this out on a cluster to be certain, but from the looks of it this is good to merge. > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15847734#comment-15847734 ] ASF GitHub Bot commented on FLINK-5183: --- Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/3232 Thanks for the comments, I've addressed them with the latest commit. > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840525#comment-15840525 ] Geoffrey Mon commented on FLINK-5183: - That solved the problem. I was really overthinking it and I was actually planning on spawning another thread to check if the Python process had died and closing the PythonPlanStreamer ServerSocket using that thread. Once I add a simple unit test and make sure that the feature works as expected, I'll create a pull request so the actual quality of code can be reviewed. > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840400#comment-15840400 ] Chesnay Schepler commented on FLINK-5183: - Awesome stuff. Could you not fix the second issue by specifying a 5 second timeout on the ServerSocket? > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5183) [py] Support multiple jobs per Python plan file
[ https://issues.apache.org/jira/browse/FLINK-5183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839961#comment-15839961 ] Geoffrey Mon commented on FLINK-5183: - I'm currently working on this at https://github.com/geofbot/flink/tree/FLINK-5183. So far I've managed to get it to work except for a few issues: * Execution environments need to be able to get a unique ID that can be used to identify it between Java and Python. At the moment, these IDs are manually assigned. * PythonPlanBinder does not exit when Python process exits, but instead waits indefinitely for more jobs * Global variables used to run operators in Python for skipping over execution environments that do not contain the operator to be run (there should be a better solution) > [py] Support multiple jobs per Python plan file > --- > > Key: FLINK-5183 > URL: https://issues.apache.org/jira/browse/FLINK-5183 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.1.3 >Reporter: Geoffrey Mon >Priority: Minor > > Support running multiple jobs per Python plan file. -- This message was sent by Atlassian JIRA (v6.3.4#6332)