Hi XiangWei,

programmatically there is no nice tooling yet to cancel jobs on a dedicated
cluster. What you can do is to use Flink's REST API to issue a cancel
command [1]. You have to send a GET request to the target URL
`/jobs/:jobid/cancel`. In the future we will improve the programmatic job
control which will allow you to do these kind of things more easily.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation

Cheers,
Till

On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw.huang...@gmail.com>
wrote:

> Hi Till,
>
>      Thanks for your answer,it worked when i use *StandaloneMiniCluster,*
> but another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for
> LocalFlinkMiniCluster i can do  it with below code :
>
> *   for (job <- cluster.getCurrentlyRunningJobsJava()) {*
>
> *      cluster.stopJob(job)   }*
>
>    Is it possible to cancel a running Flink job without shutting down a 
> *StandaloneMiniCluster
> ?*
>
> Best Regards,
> XiangWei
>
>
>
> 在 2017年9月14日,16:58,Till Rohrmann <trohrm...@apache.org> 写道:
>
> Hi XiangWei,
>
> the problem is that the LocalFlinkMiniCluster can no longer be used in
> combination with a RemoteExecutionEnvironment. The reason is that the
> LocalFlinkMiniCluster uses now an internal leader election service and
> assigns leader ids to its components. Since this is an internal service it
> is not possible to retrieve this information like it is the case with the
> ZooKeeper based leader election services.
>
> Long story short, the Flink Scala shell currently does not work with a
> LocalFlinkMiniCluster and would have to be fixed to work properly
> together with a local execution environment. Until then, I recommend
> starting a local standalone cluster and let the code run there.
>
> Cheers,
> Till
> ​
>
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang...@gmail.com>
> wrote:
>
>> dear all,
>>
>> *Below is the code i execute:*
>>
>> import java.io._
>> import java.net.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.atomic.AtomicBoolean
>>
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, Inte
>> rpreterResult, InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.scala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions
>> , Configuration, ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniC
>> luster, LocalFlinkMiniCluster}
>>
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>
>> class FlinkInterpreter extends Interpreter {
>>   private var bufferedReader: Option[BufferedReader] = None
>>   private var jprintWriter: JPrintWriter = _
>>   private val config = new Configuration;
>>   private var cluster: LocalFlinkMiniCluster = _
>>   @BeanProperty var imain: IMain = _
>>   @BeanProperty var flinkILoop: FlinkILoop = _
>>   private var out: ByteBufOutputStream = null
>>   private var outBuf: ByteBuf = null
>>   private var in: ByteBufInputStream = _
>>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>
>>   override def isOpen: Boolean = {
>>     isRunning.get()
>>   }
>>
>>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>>     config.toMap.toMap.foreach(println)
>>     config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>>     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>>     config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>>     config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>>     config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>>     config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>>     val localCluster = new LocalFlinkMiniCluster(config, false)
>>     localCluster.start(true)
>>     val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.
>> get.head).port
>>     println(s"Starting local Flink cluster (host:
>> localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>>     ("localhost", localCluster.getLeaderRPCPort, localCluster)
>>   }
>>
>>
>>   /**
>>    * Start flink cluster and create interpreter
>>    */
>>   override def open: Unit = {
>>     outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>>     out = new ByteBufOutputStream(outBuf)
>>     in = new ByteBufInputStream(outBuf)
>>     //    val (host, port, yarnCluster) = 
>> deployNewYarnCluster(YarnConfig(Option(1),
>> None, None, None, Option(1), None))
>>     val (host, port, localCluster) = startLocalMiniCluster()
>>     this.cluster = localCluster
>>     val conf = cluster.configuration
>>     println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>>     flinkILoop = new FlinkILoop(host, port, conf, None)
>>     val settings = new Settings()
>>     settings.usejavacp.value = true
>>     settings.Yreplsync.value = true
>>     flinkILoop.settings_$eq(settings)
>>     flinkILoop.createInterpreter()
>>     imain = flinkILoop.intp
>>     FlinkInterpreter.ourClassloader = imain.classLoader
>>     val benv = flinkILoop.scalaBenv
>>     val senv = flinkILoop.scalaSenv
>>     benv.getConfig.disableSysoutLogging()
>>     senv.getConfig.disableSysoutLogging()
>>     // import libraries
>>     imain.interpret("import scala.tools.nsc.io._")
>>     //    imain.interpret("import Properties.userHome")
>>     imain.interpret("import scala.compat.Platform.EOL")
>>     imain.interpret("import org.apache.flink.api.scala._")
>>     imain.interpret("import org.apache.flink.api.common.functions._")
>>     isRunning.set(true)
>>   }
>>
>>   override def interpret(line: String): InterpreterResult = {
>>     if (line == null || line.trim.length == 0) {
>>       return new InterpreterResult(Code.SUCCESS)
>>     }
>>     interpret(line.split("\n"))
>>   }
>>
>>   /**
>>    * Interprete code
>>    * @param lines
>>    * @return
>>    */
>>   def interpret(lines: Array[String]): InterpreterResult = {
>>     val imain: IMain = getImain
>>     val linesToRun: Array[String] = new Array[String](lines.length + 1)
>>     for (i <- 0 until lines.length) {
>>       linesToRun(i) = lines(i)
>>     }
>>     linesToRun(lines.length) = "print(\"\")"
>>     System.setOut(new PrintStream(out))
>>     out.buffer().clear()
>>     var r: Code = null
>>     var incomplete: String = ""
>>     var inComment: Boolean = false
>>     for (l <- 0 until linesToRun.length) {
>>       val s: String = linesToRun(l)
>>       var continuation: Boolean = false
>>       if (l + 1 < linesToRun.length) {
>>         val nextLine: String = linesToRun(l + 1).trim
>>         if (nextLine.isEmpty ||
>>             nextLine.startsWith("//") ||
>>             nextLine.startsWith("}") ||
>>             nextLine.startsWith("object")) {
>>           continuation = true
>>         } else if (!inComment && nextLine.startsWith("/*")) {
>>           inComment = true
>>           continuation = true
>>         } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>>           inComment = false
>>           continuation = true
>>         } else if (nextLine.length > 1 &&
>>             nextLine.charAt(0) == '.' &&
>>             nextLine.charAt(1) != '.' &&
>>             nextLine.charAt(1) != '/') {
>>           continuation = true
>>         } else if (inComment) {
>>           continuation = true
>>         }
>>         if (continuation) {
>>           incomplete += s + "\n"
>>         }
>>       }
>>       if (!continuation) {
>>         val currentCommand: String = incomplete
>>         var res: Results.Result = null
>>         try {
>>           res = Console.withOut(System.out)(ne
>> w AbstractFunction0[Results.Result] {
>>             override def apply() = {
>>               imain.interpret(currentCommand + s)
>>             }
>>           }.apply())
>>         } catch {
>>           case e: Exception =>
>>             logError("Interpreter Exception ", e)
>>             return new InterpreterResult(Code.ERR
>> OR, InterpreterUtils.getMostRelevantMessage(e))
>>         }
>>         r = getResultCode(res)
>>         if (r == Code.ERROR) {
>>           return new InterpreterResult(r, out.toString)
>>         } else if (r eq Code.INCOMPLETE) {
>>           incomplete += s + "\n"
>>         } else {
>>           incomplete = ""
>>         }
>>       }
>>     }
>>
>>     if (r eq Code.INCOMPLETE) {
>>       return new InterpreterResult(r, "Incomplete expression")
>>     }
>>     else {
>>       return new InterpreterResult(r, out.buffer().toString(Charset.forNa
>> me("utf-8")))
>>     }
>>   }
>>
>>   private def getResultCode(r: Results.Result): Code = {
>>     if (r.isInstanceOf[Results.Success.type]) {
>>       return Code.SUCCESS
>>     }
>>     else if (r.isInstanceOf[Results.Incomplete.type]) {
>>       return Code.INCOMPLETE
>>     }
>>     else {
>>       return Code.ERROR
>>     }
>>   }
>>
>>   }
>> }
>>
>> object FlinkInterpreter extends Logging {
>>   var ourClassloader: ClassLoader = _
>>
>>   def main(args: Array[String]): Unit = {
>>     val interpreter: FlinkInterpreter = new FlinkInterpreter
>>     val code =
>>       """
>>         |val dataStream = senv.fromElements(1,2,3,4,5)
>>         |dataStream.countWindowAll(2).sum(0).print()
>>         |senv.execute("My streaming program")
>>       """.stripMargin
>>     interpreter.open
>>     val result = interpreter.interpret(code)
>>   }
>> }
>>
>> *The error messages i got are:*
>> …
>> …
>> ...
>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
>> Discard message LeaderSessionMessage(00000000-
>> 0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
>> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES))
>> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7
>> did not equal the received leader session ID 00000000-0000-0000-0000-000000
>> 000000.
>> [INFO] [17/09/13 12:05:52] 
>> [org.apache.flink.runtime.client.JobSubmissionClientActor]
>> Terminate JobClientActor.
>> [INFO] [17/09/13 12:05:52] 
>> [org.apache.flink.runtime.client.JobSubmissionClientActor]
>> Disconnect from JobManager Actor[akka.tcp://flink@localho
>> st:63522/user/jobmanager#82627940].
>> [INFO] [17/09/13 12:05:52] 
>> [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Shutting down remote daemon.
>> [INFO] [17/09/13 12:05:52] 
>> [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Remote daemon shut down; proceeding with flushing remote transports.
>> [INFO] [17/09/13 12:05:52] 
>> [akka.remote.RemoteActorRefProvider$RemotingTerminator]
>> Remoting shut down.
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Couldn't retrieve the JobExecutionResult from the
>> JobManager.
>>   at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:478)
>>   at org.apache.flink.client.program.StandaloneClusterClient.subm
>> itJob(StandaloneClusterClient.java:105)
>>   at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:442)
>>   at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:434)
>>   at org.apache.flink.streaming.api.environment.RemoteStreamEnvir
>> onment.executeRemotely(RemoteStreamEnvironment.java:212)
>>   at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.
>> executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>>   at org.apache.flink.streaming.api.environment.RemoteStreamEnvir
>> onment.execute(RemoteStreamEnvironment.java:176)
>>   at org.apache.flink.streaming.api.scala.StreamExecutionEnvironm
>> ent.execute(StreamExecutionEnvironment.scala:638)
>>   ... 34 elided
>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>> Couldn't retrieve the JobExecutionResult from the JobManager.
>>   at org.apache.flink.runtime.client.JobClient.awaitJobResult(
>> JobClient.java:309)
>>   at org.apache.flink.runtime.client.JobClient.submitJobAndWait(
>> JobClient.java:396)
>>   at org.apache.flink.client.program.ClusterClient.run(ClusterCli
>> ent.java:467)
>>   ... 41 more
>> Caused by: 
>> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> Job submission to the JobManager timed out. You may increase
>> 'akka.client.timeout' in case the JobManager needs more time to configure
>> and confirm the job submission.
>>   at org.apache.flink.runtime.client.JobSubmissionClientActor.han
>> dleCustomMessage(JobSubmissionClientActor.java:119)
>>   at org.apache.flink.runtime.client.JobClientActor.handleMessage
>> (JobClientActor.java:251)
>>   at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeader
>> SessionID(FlinkUntypedActor.java:89)
>>   at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(Fl
>> inkUntypedActor.java:68)
>>   at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp
>> edActor.scala:167)
>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>   at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.
>> exec(AbstractDispatcher.scala:397)
>>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>>   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>>
>>
>>
>>
>>
>
>

Reply via email to