Hi Till,
   
     Thanks for your answer,it worked when i use StandaloneMiniCluster,but 
another problem is that i can’t find a way to cancel
a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster 
i can do  it with below code :

   for (job <- cluster.getCurrentlyRunningJobsJava()) {
      cluster.stopJob(job)
   }

   Is it possible to cancel a running Flink job without shutting down a 
StandaloneMiniCluster ?

Best Regards,
XiangWei



> 在 2017年9月14日,16:58,Till Rohrmann <trohrm...@apache.org> 写道:
> 
> Hi XiangWei,
> 
> the problem is that the LocalFlinkMiniCluster can no longer be used in 
> combination with a RemoteExecutionEnvironment. The reason is that the 
> LocalFlinkMiniCluster uses now an internal leader election service and 
> assigns leader ids to its components. Since this is an internal service it is 
> not possible to retrieve this information like it is the case with the 
> ZooKeeper based leader election services.
> 
> Long story short, the Flink Scala shell currently does not work with a 
> LocalFlinkMiniCluster and would have to be fixed to work properly together 
> with a local execution environment. Until then, I recommend starting a local 
> standalone cluster and let the code run there.
> 
> Cheers,
> Till
> 
> 
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang...@gmail.com 
> <mailto:xw.huang...@gmail.com>> wrote:
> dear all,
> 
> Below is the code i execute:
> 
> import java.io <http://java.io/>._
> import java.net <http://java.net/>.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
> 
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
> InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, 
> ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
> LocalFlinkMiniCluster}
> 
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
> 
> class FlinkInterpreter extends Interpreter {
>   private var bufferedReader: Option[BufferedReader] = None
>   private var jprintWriter: JPrintWriter = _
>   private val config = new Configuration;
>   private var cluster: LocalFlinkMiniCluster = _
>   @BeanProperty var imain: IMain = _
>   @BeanProperty var flinkILoop: FlinkILoop = _
>   private var out: ByteBufOutputStream = null
>   private var outBuf: ByteBuf = null
>   private var in: ByteBufInputStream = _
>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
> 
>   override def isOpen: Boolean = {
>     isRunning.get()
>   }
> 
>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>     config.toMap.toMap.foreach(println)
>     config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>     config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>     config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>     config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>     config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>     val localCluster = new LocalFlinkMiniCluster(config, false)
>     localCluster.start(true)
>     val port = 
> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
>     println(s"Starting local Flink cluster (host: localhost,port: 
> ${localCluster.getLeaderRPCPort}).\n")
>     ("localhost", localCluster.getLeaderRPCPort, localCluster)
>   }
> 
>  
>   /**
>    * Start flink cluster and create interpreter
>    */
>   override def open: Unit = {
>     outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>     out = new ByteBufOutputStream(outBuf)
>     in = new ByteBufInputStream(outBuf)
>     //    val (host, port, yarnCluster) = 
> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
>     val (host, port, localCluster) = startLocalMiniCluster()
>     this.cluster = localCluster
>     val conf = cluster.configuration
>     println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>     flinkILoop = new FlinkILoop(host, port, conf, None)
>     val settings = new Settings()
>     settings.usejavacp.value = true
>     settings.Yreplsync.value = true
>     flinkILoop.settings_$eq(settings)
>     flinkILoop.createInterpreter()
>     imain = flinkILoop.intp
>     FlinkInterpreter.ourClassloader = imain.classLoader
>     val benv = flinkILoop.scalaBenv
>     val senv = flinkILoop.scalaSenv
>     benv.getConfig.disableSysoutLogging()
>     senv.getConfig.disableSysoutLogging()
>     // import libraries
>     imain.interpret("import scala.tools.nsc.io 
> <http://scala.tools.nsc.io/>._")
>     //    imain.interpret("import Properties.userHome")
>     imain.interpret("import scala.compat.Platform.EOL")
>     imain.interpret("import org.apache.flink.api.scala._")
>     imain.interpret("import org.apache.flink.api.common.functions._")
>     isRunning.set(true)
>   }
> 
>   override def interpret(line: String): InterpreterResult = {
>     if (line == null || line.trim.length == 0) {
>       return new InterpreterResult(Code.SUCCESS)
>     }
>     interpret(line.split("\n"))
>   }
> 
>   /**
>    * Interprete code
>    * @param lines
>    * @return
>    */
>   def interpret(lines: Array[String]): InterpreterResult = {
>     val imain: IMain = getImain
>     val linesToRun: Array[String] = new Array[String](lines.length + 1)
>     for (i <- 0 until lines.length) {
>       linesToRun(i) = lines(i)
>     }
>     linesToRun(lines.length) = "print(\"\")"
>     System.setOut(new PrintStream(out))
>     out.buffer().clear()
>     var r: Code = null
>     var incomplete: String = ""
>     var inComment: Boolean = false
>     for (l <- 0 until linesToRun.length) {
>       val s: String = linesToRun(l)
>       var continuation: Boolean = false
>       if (l + 1 < linesToRun.length) {
>         val nextLine: String = linesToRun(l + 1).trim
>         if (nextLine.isEmpty ||
>             nextLine.startsWith("//") ||
>             nextLine.startsWith("}") ||
>             nextLine.startsWith("object")) {
>           continuation = true
>         } else if (!inComment && nextLine.startsWith("/*")) {
>           inComment = true
>           continuation = true
>         } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>           inComment = false
>           continuation = true
>         } else if (nextLine.length > 1 &&
>             nextLine.charAt(0) == '.' &&
>             nextLine.charAt(1) != '.' &&
>             nextLine.charAt(1) != '/') {
>           continuation = true
>         } else if (inComment) {
>           continuation = true
>         }
>         if (continuation) {
>           incomplete += s + "\n"
>         }
>       }
>       if (!continuation) {
>         val currentCommand: String = incomplete
>         var res: Results.Result = null
>         try {
>           res = Console.withOut(System.out)(new 
> AbstractFunction0[Results.Result] {
>             override def apply() = {
>               imain.interpret(currentCommand + s)
>             }
>           }.apply())
>         } catch {
>           case e: Exception =>
>             logError("Interpreter Exception ", e)
>             return new InterpreterResult(Code.ERROR, 
> InterpreterUtils.getMostRelevantMessage(e))
>         }
>         r = getResultCode(res)
>         if (r == Code.ERROR) {
>           return new InterpreterResult(r, out.toString)
>         } else if (r eq Code.INCOMPLETE) {
>           incomplete += s + "\n"
>         } else {
>           incomplete = ""
>         }
>       }
>     }
> 
>     if (r eq Code.INCOMPLETE) {
>       return new InterpreterResult(r, "Incomplete expression")
>     }
>     else {
>       return new InterpreterResult(r, 
> out.buffer().toString(Charset.forName("utf-8")))
>     }
>   }
> 
>   private def getResultCode(r: Results.Result): Code = {
>     if (r.isInstanceOf[Results.Success.type]) {
>       return Code.SUCCESS
>     }
>     else if (r.isInstanceOf[Results.Incomplete.type]) {
>       return Code.INCOMPLETE
>     }
>     else {
>       return Code.ERROR
>     }
>   }
> 
>   }
> }
> 
> object FlinkInterpreter extends Logging {
>   var ourClassloader: ClassLoader = _
> 
>   def main(args: Array[String]): Unit = {
>     val interpreter: FlinkInterpreter = new FlinkInterpreter
>     val code =
>       """
>         |val dataStream = senv.fromElements(1,2,3,4,5)
>         |dataStream.countWindowAll(2).sum(0).print()
>         |senv.execute("My streaming program")
>       """.stripMargin
>     interpreter.open
>     val result = interpreter.interpret(code)
>   }
> }
> 
> The error messages i got are:
> …
> …
> ...
> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] 
> Discard message 
> LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
>  1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) 
> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 
> did not equal the received leader session ID 
> 00000000-0000-0000-0000-000000000000.
> [INFO] [17/09/13 12:05:52] 
> [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate 
> JobClientActor.
> [INFO] [17/09/13 12:05:52] 
> [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from 
> JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 
> <>].
> [INFO] [17/09/13 12:05:52] 
> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote 
> daemon.
> [INFO] [17/09/13 12:05:52] 
> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut 
> down; proceeding with flushing remote transports.
> [INFO] [17/09/13 12:05:52] 
> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Couldn't retrieve the JobExecutionResult from the 
> JobManager.
>   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
>   at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
>   at 
> org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>   at 
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>   ... 34 elided
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
> retrieve the JobExecutionResult from the JobManager.
>   at 
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>   at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>   at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
>   ... 41 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
> submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to configure and 
> confirm the job submission.
>   at 
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>   at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>   at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>   at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>   at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
> 
> 
> 
> 

Reply via email to