Hello all.

Firstly- thank you for your valuable advices.
We did some very fine tuned pinpoint test and comes to following conclusions

1.We run on Ubuntu 14 flink for hadoop 2.7
2.Once we copy our Java client program directy to the machine and run it
directly there it worked very good
The program is

.....

ExecutionEnvironment envRemote
=ExecutionEnvironment.createRemoteEnvironment("localhost", 6123,
"\usr\local\HananTestProj.jar");


org.apache.flink.api.java.DataSet text =
(org.apache.flink.api.java.DataSet) envRemote.fromElements(
           "Who's there?",
           "I think I hear them. Stand, ho! Who's there?");

       org.apache.flink.api.java.DataSet<Tuple2<String, Integer>>
wordCounts = text
           .flatMap(new LineSplitter())
           .groupBy(0)
           .sum(1);

       wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String,
Tuple2<String, Integer>> {
       public void flatMap(String line, Collector<Tuple2<String, Integer>>
out) {
           for (String word : line.split(" ")) {
               out.collect(new Tuple2<String, Integer>(word, 1));
           }
       }
   }

.....
Program works fine
3.Now we are trying to run this program remotely , from windows machine
when the first row looks differently
ExecutionEnvironment envRemote
=ExecutionEnvironment.createRemoteEnvironment("1.2.3.4", 6123,
"C:\\HananTestProj.jar");
 when 1.2.3.4   is IP address of fink machine

4.We got an exception :Jobmanager at 1.2.3.4 cant be reached bla bla bla

5.in flink configuration we found a following line jobmanager.rpc.address:
localhost
Flink cant be started with any other value (hostname/ipaddress ) except the
localhost


6.We assume that probably Flink has a critical bug : it cant be started
from remote machine, only locally. Are we right? Are we wrong? Should we
fill JIRA ?
Maybe we need somehow to configure Flink differently?

Please advice
Best regards



On Sun, Aug 30, 2015 at 3:19 PM, Robert Metzger <rmetz...@apache.org> wrote:

> The output of the YARN session should look like this:
>
> Flink JobManager is now running on quickstart.cloudera:39956
> JobManager Web Interface:
> http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/
> Number of connected TaskManagers changed to 1. Slots available: 1
>
>
>
>
> On Sun, Aug 30, 2015 at 11:12 AM, Stephan Ewen <se...@apache.org> wrote:
>
> > The only thing I can think of is that you are not using the right
> host/port
> > for the JobManager.
> >
> > When you start the YARN session, it should print the host where the
> > JobManager runs. You also need to take the port from there, as in YARN,
> the
> > port is usually not 6123. Yarn starts many services on one machine, so
> the
> > ports need to be randomized.
> >
> > It may be worth adding a YARNExecutionEnvironment at some point, which
> > deals with this transparent (starting the YARN cluster, connecting to the
> > JobManager).
> >
> > On Sun, Aug 30, 2015 at 10:12 AM, Hanan Meyer <ha...@scalabill.it>
> wrote:
> >
> > > Hello.
> > > Let me clarify the situation.
> > > 1. We are using flink 0.9.0 for Hadoop 2.7. We connected it to HDFS
> > 2.7.1.
> > > 2. Locally, our program is working: once we run flink as
> > ./start-local.sh,
> > > we are able to connect and run the createRemoteEnvironment and Execute
> > > methods.
> > > 3.Due to our architecture and basic Flink feature we want to invoke
> this
> > > functionality REMOTELY , when our Java code is calling the Flink
> methods
> > > from another server.
> > > 4.We tried both ExecutionEnvironment.createRemoteEnvironment("1.2.3.1",
> > > 6123, "TestProj.jar"); and
> ExecutionEnvironment.createRemoteEnvironment("
> > > flink@1.2.3.1", 6123, "TestProj.jar"); (which is definitely not right
> > > since
> > > it should be an IP address) - it crash on the "cant reach JobManager"
> > > error.
> > >
> > > It seems to us that it can be  one of 2 issues.
> > > 1.Somehow we need to configure flink to accept the connections from the
> > > remote machine
> > > 2.Flink has a critical showstopper bug that jeopardizing a whole
> decision
> > > to use this technology.
> > >
> > > Please advise us how we should advance.
> > >
> > >
> > >
> > >
> > > On Fri, Aug 28, 2015 at 10:27 AM, Robert Metzger <rmetz...@apache.org>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > in the exception you've posted earlier, you can see the following
> root
> > > > cause:
> > > >
> > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > Path(/user/jobmanager)]
> > > >
> > > > This string "akka.tcp://flink@FLINK_SERVER_URL:6123/" usually looks
> > like
> > > > this: "akka.tcp://flink@1.2.3.4:6123/". So it seems that you are
> > > > passing FLINK_SERVER_URL
> > > > as the server hostname (or ip).
> > > > Can you pass the correct hostname when you call ExecutionEnvironment.
> > > > createRemoteEnvironment().
> > > >
> > > > On Fri, Aug 28, 2015 at 7:52 AM, Hanan Meyer <ha...@scalabill.it>
> > wrote:
> > > >
> > > > > Hi
> > > > >  I'm currently using flink 0.9.0 which by maven support Hadoop 1 .
> > > > > By using flink-clients-0.7.0-hadoop2-incubating.jar with
> > > executePlan(Plan
> > > > > p) method  instead, I'm getting the same exception
> > > > >
> > > > > Hanan
> > > > >
> > > > > On Fri, Aug 28, 2015 at 8:35 AM, Hanan Meyer <ha...@scalabill.it>
> > > wrote:
> > > > >
> > > > > >
> > > > > > Hi
> > > > > >
> > > > > > 1. I have restarted Flink service via stop/start-loval.sh - it
> have
> > > > been
> > > > > > restarted successfully ,no errors in log folder
> > > > > > 2. default flink port is -6123
> > > > > >
> > > > > > Getting this via Eclips IDE:
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > >
> > > > > > org.apache.flink.client.program.ProgramInvocationException:
> Failed
> > to
> > > > > > resolve JobManager
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:379)
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:356)
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:349)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > > > > > at Test.main(Test.java:39)
> > > > > > Caused by: java.io.IOException: JobManager at
> > > > > > akka.tcp://flink@FLINK_SERVER_URL:6123/user/jobmanager not
> > > reachable.
> > > > > > Please make sure that the JobManager is running and its port is
> > > > > reachable.
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1197)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1221)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager$.getJobManagerRemoteReference(JobManager.scala:1239)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.runtime.jobmanager.JobManager.getJobManagerRemoteReference(JobManager.scala)
> > > > > > at org.apache.flink.client.program.Client.run(Client.java:376)
> > > > > > ... 7 more
> > > > > > Caused by: akka.actor.ActorNotFound: Actor not found for:
> > > > > > ActorSelection[Anchor(akka.tcp://flink@FLINK_SERVER_URL:6123/),
> > > > > > Path(/user/jobmanager)]
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> > > > > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> > > > > > at akka.dispatch.BatchingExecutor$
> > > > > > Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> > > > > > at
> > > >
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> > > > > > at
> > > akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> > > > > > at
> > > > >
> > >
> akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> > > > > > at
> > > > > >
> > > >
> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> > > > > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> > > > > > at
> akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> > > > > > at
> akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> > > > > > at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> > > > > > at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> > > > > > at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> > > > > > at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> > > > > > at
> > > > > >
> > > >
> > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> > > > > > at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> > > > > > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> > > > > > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> > > > > > at
> > akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> > > > > > at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> > > > > > at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > > > > > at
> > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> > > > > > at
> > > > >
> > >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > > > >
> > > > > >
> > > > > > On Thu, Aug 27, 2015 at 10:47 PM, Robert Metzger <
> > > rmetz...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > >> I guess you are getting an entire exception after the
> > > > "org.apache.flink
> > > > > >> .client.program.ProgramInvocationException: Failed to
> > > > > >> resolve JobManager".
> > > > > >> Can you post it here to help us understanding the issue?
> > > > > >>
> > > > > >> On Thu, Aug 27, 2015 at 6:55 PM, Alexey Sapozhnikov <
> > > > > ale...@scalabill.it>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hello all.
> > > > > >> >
> > > > > >> > Some clarification: locally everything works great.
> > > > > >> > However once we run our Flink on remote linux machine and try
> to
> > > run
> > > > > the
> > > > > >> > client program from our machine, using create remote
> > environment-
> > > > > Flink
> > > > > >> > JobManager is raising this exception
> > > > > >> >
> > > > > >> > On Thu, Aug 27, 2015 at 7:41 PM, Stephan Ewen <
> se...@apache.org
> > >
> > > > > wrote:
> > > > > >> >
> > > > > >> > > If you start the job via the "bin/flink" script, then simply
> > use
> > > > > >> > > "ExecutionEnvironment.getExecutionEnvironment()" rather then
> > > > > creating
> > > > > >> a
> > > > > >> > > remote environment manually.
> > > > > >> > >
> > > > > >> > > That way, hosts and ports are configured automatically.
> > > > > >> > >
> > > > > >> > > On Thu, Aug 27, 2015 at 6:39 PM, Robert Metzger <
> > > > > rmetz...@apache.org>
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > >> Hi,
> > > > > >> > >>
> > > > > >> > >> Which values did you use for FLINK_SERVER_URL and
> FLINK_PORT?
> > > > > >> > >> Every time you deploy Flink on YARN, the host and port
> > change,
> > > > > >> because
> > > > > >> > the
> > > > > >> > >> JobManager is started on a different YARN container.
> > > > > >> > >>
> > > > > >> > >>
> > > > > >> > >> On Thu, Aug 27, 2015 at 6:32 PM, Hanan Meyer <
> > > ha...@scalabill.it
> > > > >
> > > > > >> > wrote:
> > > > > >> > >>
> > > > > >> > >> > Hello All
> > > > > >> > >> >
> > > > > >> > >> > When using Eclipse IDE to submit Flink to Yarn single
> node
> > > > > cluster
> > > > > >> I'm
> > > > > >> > >> > getting :
> > > > > >> > >> >
> > "org.apache.flink.client.program.ProgramInvocationException:
> > > > > >> Failed to
> > > > > >> > >> > resolve JobManager"
> > > > > >> > >> >
> > > > > >> > >> > Using Flink 0.9.0
> > > > > >> > >> >
> > > > > >> > >> > The Jar copy a file from one location in Hdfs to another
> > and
> > > > > works
> > > > > >> > fine
> > > > > >> > >> > while executed locally on the single node Yarn cluster -
> > > > > >> > >> > bin/flink run -c Test ./examples/MyJar.jar
> > > > > >> > >> > hdfs://localhost:9000/flink/in.txt
> > > > > >> hdfs://localhost:9000/flink/out.txt
> > > > > >> > >> >
> > > > > >> > >> > The code skeleton:
> > > > > >> > >> >
> > > > > >> > >> >     ExecutionEnvironment envRemote =
> > > > > >> > >> > ExecutionEnvironment.createRemoteEnvironment
> > > > > >> > >> > (FLINK_SERVER_URL,FLINK PORT,JAR_PATH_ON_CLIENT);
> > > > > >> > >> > DataSet<String> data =
> > > > > >> > >> >
> > envRemote.readTextFile("hdfs://localhost:9000/flink/in.txt");
> > > > > >> > >> > data.writeAsText("hdfs://localhost:9000/flink/out.txt");
> > > > > >> > >> > envRemote.execute();
> > > > > >> > >> >
> > > > > >> > >> >
> > > > > >> > >> > Please advise,
> > > > > >> > >> >
> > > > > >> > >> > Hanan Meyer
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to