You can use "bin/flink cancel JOBID" or JobManager WebUI to cancel the
running job.

The exception you see, occurs in FlinkSubmitter.killTopology(...) which
is not used by "bin/flink cancel" or JobMaanger WebUI.

If you compile the example you yourself, just remove the call to
killTopology().

-Matthias

On 09/01/2015 11:16 PM, Matthias J. Sax wrote:
> Oh yes. I forgot about this. I have already a fix for it in a pending
> pull request... I hope that this PR is merged soon...
> 
> If you want to observe the progress, look here:
> https://issues.apache.org/jira/browse/FLINK-2111
> and
> https://issues.apache.org/jira/browse/FLINK-2338
> 
> This PR, resolves both and fixed the problem you observed:
> https://github.com/apache/flink/pull/750
> 
> -Matthias
> 
> 
> On 09/01/2015 11:09 PM, Jerry Peng wrote:
>> Hello,
>>
>> I corrected the number of slots for each task manager but now when I try
>> to run the WordCount-StormTopology, the job manager daemon on my master
>> node crashes and I get this exception in the log:
>>
>> java.lang.Exception: Received a message
>> CancelJob(6a4b9aa01ec87db20060210e5b36065e) without a leader session ID,
>> even though the message requires a leader session ID.
>>
>> at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:41)
>>
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>
>> at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>> at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104)
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> It seems to have something to do with canceling of the topology after
>> the sleep.  Any ideas?
>>
>>
>> Best,
>>
>>
>> Jerry
>>
>>
>> On Tue, Sep 1, 2015 at 3:33 PM, Matthias J. Sax
>> <mj...@informatik.hu-berlin.de <mailto:mj...@informatik.hu-berlin.de>>
>> wrote:
>>
>>     Yes. That is what I expected.
>>
>>     JobManager cannot start the job, due to less task slots. It logs the
>>     exception NoResourceAvailableException (it is not shown in stdout; see
>>     "log" folder). There is no feedback to Flink CLI that the job could not
>>     be started.
>>
>>     Furthermore, WordCount-StormTopology sleeps for 5 seconds and tries to
>>     "kill" the job. However, because the job was never started, there is a
>>     NotAliveException which in print to stdout.
>>
>>     -Matthias
>>
>>
>>
>>     On 09/01/2015 10:26 PM, Jerry Peng wrote:
>>     > When I run WordCount-StormTopology I get the following exception:
>>     >
>>     > ~/flink/bin/flink run WordCount-StormTopology.jar
>>     > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/data.txt
>>     > hdfs:///home/jerrypeng/hadoop/hadoop_dir/data/results.txt
>>     >
>>     > org.apache.flink.client.program.ProgramInvocationException: The main
>>     > method caused an error.
>>     >
>>     > at
>>     >
>>     
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:452)
>>     >
>>     > at
>>     >
>>     
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>     >
>>     > at org.apache.flink.client.program.Client.run(Client.java:278)
>>     >
>>     > at
>>     org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:631)
>>     >
>>     > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:319)
>>     >
>>     > at
>>     org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:954)
>>     >
>>     > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1004)
>>     >
>>     > Caused by: NotAliveException(msg:null)
>>     >
>>     > at
>>     >
>>     
>> org.apache.flink.stormcompatibility.api.FlinkClient.killTopologyWithOpts(FlinkClient.java:209)
>>     >
>>     > at
>>     >
>>     
>> org.apache.flink.stormcompatibility.api.FlinkClient.killTopology(FlinkClient.java:203)
>>     >
>>     > at
>>     >
>>     
>> org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter.main(StormWordCountRemoteBySubmitter.java:80)
>>     >
>>     > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>     >
>>     > at
>>     >
>>     
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>     >
>>     > at
>>     >
>>     
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     >
>>     > at java.lang.reflect.Method.invoke(Method.java:483)
>>     >
>>     > at
>>     >
>>     
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>>     >
>>     > ... 6 more
>>     >
>>     >
>>     > The exception above occurred while trying to run your command.
>>     >
>>     >
>>     > Any idea how to fix this?
>>     >
>>     > On Tue, Sep 1, 2015 at 3:10 PM, Matthias J. Sax
>>     > <mj...@informatik.hu-berlin.de
>>     <mailto:mj...@informatik.hu-berlin.de>
>>     <mailto:mj...@informatik.hu-berlin.de
>>     <mailto:mj...@informatik.hu-berlin.de>>>
>>     > wrote:
>>     >
>>     >     Hi Jerry,
>>     >
>>     >     WordCount-StormTopology uses a hard coded dop of 4. If you
>>     start up
>>     >     Flink in local mode (bin/start-local-streaming.sh), you need
>>     to increase
>>     >     the number of task slots to at least 4 in conf/flink-conf.yaml
>>     before
>>     >     starting Flink -> taskmanager.numberOfTaskSlots
>>     >
>>     >     You should actually see the following exception in
>>     >     log/flink-...-jobmanager-...log
>>     >
>>     >     > NoResourceAvailableException: Not enough free slots available to
>>     >     run the job. You can decrease the operator parallelism or increase
>>     >     the number of slots per TaskManager in the configuration.
>>     >
>>     >     WordCount-StormTopology does use StormWordCountRemoteBySubmitter
>>     >     internally. So, you do use it already ;)
>>     >
>>     >     I am not sure what you mean by "get rid of KafkaSource"? It is
>>     still in
>>     >     the code base. Which version to you use? In
>>     flink-0.10-SNAPSHOT it is
>>     >     located in submodule "flink-connector-kafka" (which is
>>     submodule of
>>     >     "flink-streaming-connector-parent" -- which is submodule of
>>     >     "flink-streamping-parent").
>>     >
>>     >
>>     >     -Matthias
>>     >
>>     >
>>     >     On 09/01/2015 09:40 PM, Jerry Peng wrote:
>>     >     > Hello,
>>     >     >
>>     >     > I have some questions regarding how to run one of the
>>     >     > flink-storm-examples, the WordCountTopology.  How should I
>>     run the
>>     >     job?
>>     >     > On github its says I should just execute
>>     >     > bin/flink run example.jar but when I execute:
>>     >     >
>>     >     > bin/flink run WordCount-StormTopology.jar
>>     >     >
>>     >     > nothing happens.  What am I doing wrong? and How can I run the
>>     >     > WordCounttopology via StormWordCountRemoteBySubmitter?
>>     >     >
>>     >     > Also why did you guys get rid of the KafkaSource class?  What is
>>     >     the API
>>     >     > now for subscribing to a kafka source?
>>     >     >
>>     >     > Best,
>>     >     >
>>     >     > Jerry
>>     >
>>     >
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to