Re: Flink's Checking and uploading JAR files Issue
Hi I rechecked that I put all my Jars in the Lib folder . I have also noticed that it fails while loading my first Pojo class . I start the cluster via Yarn using Flink 0.9.1 . Thanks , Mr Hanan Meyer On Thu, Sep 24, 2015 at 6:08 PM, Stephan Ewen wrote: > My first guess would be that you did not put all jars into the lib folder. > > To help us understand this, do you start the cluster manually, or via YARN? > > On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer wrote: > > > Hi > > Thanks for the fast response > > I Have tried the walk-around by excluding the Jars from the > > RemoteEnvironment's init line : > > ExecutionEnvironment env = > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT); > > instrad of : > > ExecutionEnvironment env = > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT, list > > of Jars ..); > > I copied the jars to the Flink's Lib folder and when I submit my job I > get > > the following exception which is caused because > > Flink can't find my Jars and Types : > > org.apache.flink.client.program.ProgramInvocationException: The program > > execution failed: Cannot initialize task 'CHAIN DataSource > > (at createInput(ExecutionEnvironment.java:502) > > (org.apache.flink.api.java.io.AvroInputFormat)) -> > > Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap > > (FlatMap at generateCsv(FlinkCSVProducer.java:78))': > > Deserializing the InputFormat (File Input > > (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df)) > > failed: > > Could not read the user code wrapper: com.scalabill.it.pa.event.Event > > at org.apache.flink.client.program.Client.run(Client.java:413) > > at org.apache.flink.client.program.Client.run(Client.java:356) > > at org.apache.flink.client.program.Client.run(Client.java:349) > > at > > > > > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > > at > > > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > > at > > > > > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > > at > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > at org.apache.flink.api.java.DataSet.count(DataSet.java:391) > > at > > > > > com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70) > > at > > > > > com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94) > > Have I been doing the walk-around currently ? > > Can you try to reproduce it in your environment ? > > Thanks for your attention! > > Hanan Meyer > > > > On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann > > wrote: > > > > > Hi Hanan, > > > > > > you're right that currently every time you submit a job to the Flink > > > cluster, all user code jars are uploaded and overwrite possibly > existing > > > files. This is not really necessary if they don't change. Maybe we > should > > > add a check that already existing files on the JobManager are not > > uploaded > > > again by the JobClient. This should improve the performance for your > use > > > case. > > > > > > The corresponding JIRA issue is > > > https://issues.apache.org/jira/browse/FLINK-2760. > > > > > > Cheers, > > > Till > > > > > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer > wrote: > > > > > > > Hello All > > > > > > > > I use Flink in order to filter data from Hdfs and write it back as > CSV. > > > > > > > > I keep getting the "Checking and uploading JAR files" on every > DataSet > > > > filtering action or > > > > executionEnvironment execution. > > > > > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) > because I > > > > launch Flink from > > > > a J2EE Aplication Server . > > > > > > > > The Jars serialization and transportation takes a huge part of the > > > > execution time . > > > > Is there a way to force Flink to pass the Jars only once? > > > > > > > > Please advise > > > > > > > > Thanks, > > > > > > > > Hanan Meyer > > > > > > > > > >
Re: Flink's Checking and uploading JAR files Issue
Hi Thanks for the fast response I Have tried the walk-around by excluding the Jars from the RemoteEnvironment's init line : ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT); instrad of : ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT, list of Jars ..); I copied the jars to the Flink's Lib folder and when I submit my job I get the following exception which is caused because Flink can't find my Jars and Types : org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Cannot initialize task 'CHAIN DataSource (at createInput(ExecutionEnvironment.java:502) (org.apache.flink.api.java.io.AvroInputFormat)) -> Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap (FlatMap at generateCsv(FlinkCSVProducer.java:78))': Deserializing the InputFormat (File Input (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df)) failed: Could not read the user code wrapper: com.scalabill.it.pa.event.Event at org.apache.flink.client.program.Client.run(Client.java:413) at org.apache.flink.client.program.Client.run(Client.java:356) at org.apache.flink.client.program.Client.run(Client.java:349) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.java.DataSet.count(DataSet.java:391) at com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70) at com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94) Have I been doing the walk-around currently ? Can you try to reproduce it in your environment ? Thanks for your attention! Hanan Meyer On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann wrote: > Hi Hanan, > > you're right that currently every time you submit a job to the Flink > cluster, all user code jars are uploaded and overwrite possibly existing > files. This is not really necessary if they don't change. Maybe we should > add a check that already existing files on the JobManager are not uploaded > again by the JobClient. This should improve the performance for your use > case. > > The corresponding JIRA issue is > https://issues.apache.org/jira/browse/FLINK-2760. > > Cheers, > Till > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer wrote: > > > Hello All > > > > I use Flink in order to filter data from Hdfs and write it back as CSV. > > > > I keep getting the "Checking and uploading JAR files" on every DataSet > > filtering action or > > executionEnvironment execution. > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I > > launch Flink from > > a J2EE Aplication Server . > > > > The Jars serialization and transportation takes a huge part of the > > execution time . > > Is there a way to force Flink to pass the Jars only once? > > > > Please advise > > > > Thanks, > > > > Hanan Meyer > > >
Flink's Checking and uploading JAR files Issue
Hello All I use Flink in order to filter data from Hdfs and write it back as CSV. I keep getting the "Checking and uploading JAR files" on every DataSet filtering action or executionEnvironment execution. I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I launch Flink from a J2EE Aplication Server . The Jars serialization and transportation takes a huge part of the execution time . Is there a way to force Flink to pass the Jars only once? Please advise Thanks, Hanan Meyer
Flink ML linear regression issue
Hi I'm using Flink ML 9.2.1 in order to perform a multiple linear regression with a csv data file. The Scala sample code for it is pretty straightforward: val mlr = MultipleLinearRegression() val parameters = ParameterMap() parameters.add(MultipleLinearRegression.Stepsize, 2.0) parameters.add(MultipleLinearRegression.Iterations, 10) parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) val inputDS = env.fromCollection(data) mlr.fit(inputDS, parameters) When I'm using Java(8) the fit method includes 3 parameters 1. dataset 2.parameters 3. object which implements -fitOperation interface multipleLinearRegression.fit(regressionDS, parameters,fitOperation); Is there a need to implement the fitOperation interface which have been already implemented in Flinks ml source code. Another option is using MultipleLinearRegression.fitMLR() method ,but I haven't found a way to pass the train dataset to it as a parameter or by setter. I'll be more than happy if you could guide me how to implement it in Java Thanks Hanan Meyer
Re: Flink ML source code
Thanks all for the fast response ! I'll check the code in order to plan out development and inform you . We may need some advise for it .. Thanks' Hanan On Fri, Sep 11, 2015 at 11:12 AM, Till Rohrmann wrote: > Hi Hanan, > > if you need any help implementing ANOVA, then let me know. I'd like to > assist you. > > Cheers, > Till > > On Thu, Sep 10, 2015 at 4:15 PM, Robert Metzger > wrote: > > > Hi, > > > > you can find the Flink ML source code here: > > https://github.com/apache/flink/tree/master/flink-staging/flink-ml > > > > On Thu, Sep 10, 2015 at 4:12 PM, Hanan Meyer wrote: > > > > > Hi > > > > > > I would like to implement an Anova algorithm based on Flink's ML > library. > > > Unfortunately I can't find the Flink 0.10 source code . > > > Is there a any way to get the code ? In case that I'll manage to > > implement > > > my idea I'll share it with the community of course ... > > > > > > Thanks , > > > > > > Hanan Meyer > > > > > >
Flink ML source code
Hi I would like to implement an Anova algorithm based on Flink's ML library. Unfortunately I can't find the Flink 0.10 source code . Is there a any way to get the code ? In case that I'll manage to implement my idea I'll share it with the community of course ... Thanks , Hanan Meyer
Re: Apache Flink:ProgramInvocationException on Yarn
Hello all. Firstly- thank you for your valuable advices. We did some very fine tuned pinpoint test and comes to following conclusions 1.We run on Ubuntu 14 flink for hadoop 2.7 2.Once we copy our Java client program directy to the machine and run it directly there it worked very good The program is . ExecutionEnvironment envRemote =ExecutionEnvironment.createRemoteEnvironment("localhost", 6123, "\usr\local\HananTestProj.jar"); org.apache.flink.api.java.DataSet text = (org.apache.flink.api.java.DataSet) envRemote.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); org.apache.flink.api.java.DataSet> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction> { public void flatMap(String line, Collector> out) { for (String word : line.split(" ")) { out.collect(new Tuple2(word, 1)); } } } . Program works fine 3.Now we are trying to run this program remotely , from windows machine when the first row looks differently ExecutionEnvironment envRemote =ExecutionEnvironment.createRemoteEnvironment("1.2.3.4", 6123, "C:\\HananTestProj.jar"); when 1.2.3.4 is IP address of fink machine 4.We got an exception :Jobmanager at 1.2.3.4 cant be reached bla bla bla 5.in flink configuration we found a following line jobmanager.rpc.address: localhost Flink cant be started with any other value (hostname/ipaddress ) except the localhost 6.We assume that probably Flink has a critical bug : it cant be started from remote machine, only locally. Are we right? Are we wrong? Should we fill JIRA ? Maybe we need somehow to configure Flink differently? Please advice Best regards On Sun, Aug 30, 2015 at 3:19 PM, Robert Metzger wrote: > The output of the YARN session should look like this: > > Flink JobManager is now running on quickstart.cloudera:39956 > JobManager Web Interface: > http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/ > Number of connected TaskManagers changed to 1. Slots available: 1 > > > > > On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen wrote: > > > The only thing I can think of is that you are not using the right > host/port > > for the JobManager. > > > > When you start the YARN session, it should print the host where the > > JobManager runs. You also need to take the port from there, as in YARN, > the > > port is usually not 6123. Yarn starts many services on one machine, so > the > > ports need to be randomized. > > > > It may be worth adding a YARNExecutionEnvironment at some point, which > > deals with this transparent (starting the YARN cluster, connecting to the > > JobManager). > > > > On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer > wrote: > > > > > Hello. > > > Let me clarify the situation. > > > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS > > 2.7.1. > > > 2. Locally, our program is working: once we run flink as > > ./start-local.sh, > > > we are able to connect and run the createRemoteEnvironment and Execute > > > methods. > > > 3.Due to our architecture and basic Flink feature we want to invoke > this > > > functionality REMOTELY , when our Java code is calling the Flink > methods > > > from another server. > > > 4.We tried both ExecutionEnvironment.createRemoteEnvironment("1.2.3.1", > > > 6123, "TestProj.jar"); and > ExecutionEnvironment.createRemoteEnvironment(" > > > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not right > > > since > > > it should be an IP address) - it crash on the "cant reach JobManager" > > > error. > > > > > > It seems to us that it can be one of 2 issues. > > > 1.Somehow we need to configure flink to accept the connections from the > > > remote machine > > > 2.Flink has a critical showstopper bug that jeopardizing a whole > decision > > > to use this technology. > > > > > > Please advise us how we should advance. > > > > > > > > > > > > > > > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger > > > wrote: > > > > > > > Hi, > > > > > > > > in the exception you've posted earlier, you can see the following > root > > > > cause: > > > > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for: > > > >
Re: Apache Flink:ProgramInvocationException on Yarn
Hello. Let me clarify the situation. 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS 2.7.1. 2. Locally, our program is working: once we run flink as ./start-local.sh, we are able to connect and run the createRemoteEnvironment and Execute methods. 3.Due to our architecture and basic Flink feature we want to invoke this functionality REMOTELY , when our Java code is calling the Flink methods from another server. 4.We tried both ExecutionEnvironment.createRemoteEnvironment("1.2.3.1", 6123, "TestProj.jar"); and ExecutionEnvironment.createRemoteEnvironment(" flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not right since it should be an IP address) - it crash on the "cant reach JobManager" error. It seems to us that it can be one of 2 issues. 1.Somehow we need to configure flink to accept the connections from the remote machine 2.Flink has a critical showstopper bug that jeopardizing a whole decision to use this technology. Please advise us how we should advance. On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger wrote: > Hi, > > in the exception you've posted earlier, you can see the following root > cause: > > Caused by: akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/), > Path(/user/jobmanager)] > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks like > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are > passing FLINK_SERVER_URL > as the server hostname (or ip). > Can you pass the correct hostname when you call ExecutionEnvironment. > createRemoteEnvironment(). > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer wrote: > > > Hi > > I'm currently using flink 0.9.0 which by maven support Hadoop 1 . > > By using flink-clients-0.7.0-hadoop2-incubating.jar with executePlan(Plan > > p) method instead, I'm getting the same exception > > > > Hanan > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer wrote: > > > > > > > > Hi > > > > > > 1. I have restarted Flink service via stop/start-loval.sh - it have > been > > > restarted successfully ,no errors in log folder > > > 2. default flink port is -6123 > > > > > > Getting this via Eclips IDE: > > > > > > Thanks > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: Failed to > > > resolve JobManager > > > at org.apache.flink.client.program.Client.run(Client.java:379) > > > at org.apache.flink.client.program.Client.run(Client.java:356) > > > at org.apache.flink.client.program.Client.run(Client.java:349) > > > at > > > > > > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > > > at > > > > > > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > > > at > > > > > > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > > > at > > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > > at Test.main(Test.java:39) > > > Caused by: java.io.IOException: JobManager at > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable. > > > Please make sure that the JobManager is running and its port is > > reachable. > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197) > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221) > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239) > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala) > > > at org.apache.flink.client.program.Client.run(Client.java:376) > > > ... 7 more > > > Caused by: akka.actor.ActorNotFound: Actor not found for: > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/), > > > Path(/user/jobmanager)] > > > at > > > > > > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > > > at > > > > > > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > at akka.dispatch.BatchingExecutor$ > > > Batch$$anonfun$run$1.pr
Re: Apache Flink:ProgramInvocationException on Yarn
Hi I'm running with a formal server ip but for securuty reasons I can't share with you the real ip . I put "FLINK_SERVER_URL" in order to replace the actual ip only in my post . Hanan Meyer On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger wrote: > Hi, > > in the exception you've posted earlier, you can see the following root > cause: > > Caused by: akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/), > Path(/user/jobmanager)] > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks like > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are > passing FLINK_SERVER_URL > as the server hostname (or ip). > Can you pass the correct hostname when you call ExecutionEnvironment. > createRemoteEnvironment(). > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer wrote: > > > Hi > > I'm currently using flink 0.9.0 which by maven support Hadoop 1 . > > By using flink-clients-0.7.0-hadoop2-incubating.jar with executePlan(Plan > > p) method instead, I'm getting the same exception > > > > Hanan > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer wrote: > > > > > > > > Hi > > > > > > 1. I have restarted Flink service via stop/start-loval.sh - it have > been > > > restarted successfully ,no errors in log folder > > > 2. default flink port is -6123 > > > > > > Getting this via Eclips IDE: > > > > > > Thanks > > > > > > > > > org.apache.flink.client.program.ProgramInvocationException: Failed to > > > resolve JobManager > > > at org.apache.flink.client.program.Client.run(Client.java:379) > > > at org.apache.flink.client.program.Client.run(Client.java:356) > > > at org.apache.flink.client.program.Client.run(Client.java:349) > > > at > > > > > > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > > > at > > > > > > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > > > at > > > > > > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > > > at > > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > > at Test.main(Test.java:39) > > > Caused by: java.io.IOException: JobManager at > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable. > > > Please make sure that the JobManager is running and its port is > > reachable. > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197) > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221) > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239) > > > at > > > > > > org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala) > > > at org.apache.flink.client.program.Client.run(Client.java:376) > > > ... 7 more > > > Caused by: akka.actor.ActorNotFound: Actor not found for: > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/), > > > Path(/user/jobmanager)] > > > at > > > > > > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > > > at > > > > > > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > > > at akka.dispatch.BatchingExecutor$ > > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > > > at > > > > > > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > > > at > > > > > > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > > > at > > > > > > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > > > at > scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > > > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > > > at > > > > > > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > > > at > > akka.dispatch.BatchingExecutor$class.execute(BatchingE
Re: Apache Flink:ProgramInvocationException on Yarn
Hi I'm currently using flink 0.9.0 which by maven support Hadoop 1 . By using flink-clients-0.7.0-hadoop2-incubating.jar with executePlan(Plan p) method instead, I'm getting the same exception Hanan On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer wrote: > > Hi > > 1. I have restarted Flink service via stop/start-loval.sh - it have been > restarted successfully ,no errors in log folder > 2. default flink port is -6123 > > Getting this via Eclips IDE: > > Thanks > > > org.apache.flink.client.program.ProgramInvocationException: Failed to > resolve JobManager > at org.apache.flink.client.program.Client.run(Client.java:379) > at org.apache.flink.client.program.Client.run(Client.java:356) > at org.apache.flink.client.program.Client.run(Client.java:349) > at > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > at > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > at > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > at Test.main(Test.java:39) > Caused by: java.io.IOException: JobManager at > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not reachable. > Please make sure that the JobManager is running and its port is reachable. > at > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197) > at > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221) > at > org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239) > at > org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala) > at org.apache.flink.client.program.Client.run(Client.java:376) > ... 7 more > Caused by: akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/), > Path(/user/jobmanager)] > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) > at > akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at akka.dispatch.BatchingExecutor$ > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at > akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) > at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) > at > akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) > at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) > at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) > at > akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:369) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorke
Re: Apache Flink:ProgramInvocationException on Yarn
g 27, 2015 at 7:41 PM, Stephan Ewen wrote: > > > > > If you start the job via the "bin/flink" script, then simply use > > > "ExecutionEnvironment.getExecutionEnvironment()" rather then creating a > > > remote environment manually. > > > > > > That way, hosts and ports are configured automatically. > > > > > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger > > > wrote: > > > > > >> Hi, > > >> > > >> Which values did you use for FLINK_SERVER_URL and FLINK_PORT? > > >> Every time you deploy Flink on YARN, the host and port change, because > > the > > >> JobManager is started on a different YARN container. > > >> > > >> > > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer > > wrote: > > >> > > >> > Hello All > > >> > > > >> > When using Eclipse IDE to submit Flink to Yarn single node cluster > I'm > > >> > getting : > > >> > "org.apache.flink.client.program.ProgramInvocationException: Failed > to > > >> > resolve JobManager" > > >> > > > >> > Using Flink 0.9.0 > > >> > > > >> > The Jar copy a file from one location in Hdfs to another and works > > fine > > >> > while executed locally on the single node Yarn cluster - > > >> > bin/flink run -c Test ./examples/MyJar.jar > > >> > hdfs://localhost:9000/flink/in.txt > hdfs://localhost:9000/flink/out.txt > > >> > > > >> > The code skeleton: > > >> > > > >> > ExecutionEnvironment envRemote = > > >> > ExecutionEnvironment.createRemoteEnvironment > > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT); > > >> > DataSet data = > > >> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt"); > > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt"); > > >> > envRemote.execute(); > > >> > > > >> > > > >> > Please advise, > > >> > > > >> > Hanan Meyer > > >> > > > >> > > > > > > > > >
Apache Flink:ProgramInvocationException on Yarn
Hello All When using Eclipse IDE to submit Flink to Yarn single node cluster I'm getting : "org.apache.flink.client.program.ProgramInvocationException: Failed to resolve JobManager" Using Flink 0.9.0 The Jar copy a file from one location in Hdfs to another and works fine while executed locally on the single node Yarn cluster - bin/flink run -c Test ./examples/MyJar.jar hdfs://localhost:9000/flink/in.txt hdfs://localhost:9000/flink/out.txt The code skeleton: ExecutionEnvironment envRemote = ExecutionEnvironment.createRemoteEnvironment (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT); DataSet data = envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt"); data.writeAsText("hdfs://localhost:9000/flink/out.txt"); envRemote.execute(); Please advise, Hanan Meyer