dear all, Below is the code i execute:
import java.io._ import java.net.{URL, URLClassLoader} import java.nio.charset.Charset import java.util.Collections import java.util.concurrent.atomic.AtomicBoolean import com.netease.atom.common.util.logging.Logging import com.netease.atom.interpreter.Code.Code import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, InterpreterUtils} import io.netty.buffer._ import org.apache.flink.api.scala.FlinkILoop import org.apache.flink.client.CliFrontend import org.apache.flink.client.cli.CliFrontendParser import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration.{QueryableStateOptions, Configuration, ConfigConstants, GlobalConfiguration} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, LocalFlinkMiniCluster} import scala.Console import scala.beans.BeanProperty import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.runtime.AbstractFunction0 import scala.tools.nsc.Settings import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results} class FlinkInterpreter extends Interpreter { private var bufferedReader: Option[BufferedReader] = None private var jprintWriter: JPrintWriter = _ private val config = new Configuration; private var cluster: LocalFlinkMiniCluster = _ @BeanProperty var imain: IMain = _ @BeanProperty var flinkILoop: FlinkILoop = _ private var out: ByteBufOutputStream = null private var outBuf: ByteBuf = null private var in: ByteBufInputStream = _ private var isRunning: AtomicBoolean = new AtomicBoolean(false) override def isOpen: Boolean = { isRunning.get() } def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = { config.toMap.toMap.foreach(println) config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1) config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true) config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) val localCluster = new LocalFlinkMiniCluster(config, false) localCluster.start(true) val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port println(s"Starting local Flink cluster (host: localhost,port: ${localCluster.getLeaderRPCPort}).\n") ("localhost", localCluster.getLeaderRPCPort, localCluster) } /** * Start flink cluster and create interpreter */ override def open: Unit = { outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480) out = new ByteBufOutputStream(outBuf) in = new ByteBufInputStream(outBuf) // val (host, port, yarnCluster) = deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None)) val (host, port, localCluster) = startLocalMiniCluster() this.cluster = localCluster val conf = cluster.configuration println(s"Connecting to Flink cluster (host:$host,port:$port)...") flinkILoop = new FlinkILoop(host, port, conf, None) val settings = new Settings() settings.usejavacp.value = true settings.Yreplsync.value = true flinkILoop.settings_$eq(settings) flinkILoop.createInterpreter() imain = flinkILoop.intp FlinkInterpreter.ourClassloader = imain.classLoader val benv = flinkILoop.scalaBenv val senv = flinkILoop.scalaSenv benv.getConfig.disableSysoutLogging() senv.getConfig.disableSysoutLogging() // import libraries imain.interpret("import scala.tools.nsc.io._") // imain.interpret("import Properties.userHome") imain.interpret("import scala.compat.Platform.EOL") imain.interpret("import org.apache.flink.api.scala._") imain.interpret("import org.apache.flink.api.common.functions._") isRunning.set(true) } override def interpret(line: String): InterpreterResult = { if (line == null || line.trim.length == 0) { return new InterpreterResult(Code.SUCCESS) } interpret(line.split("\n")) } /** * Interprete code * @param lines * @return */ def interpret(lines: Array[String]): InterpreterResult = { val imain: IMain = getImain val linesToRun: Array[String] = new Array[String](lines.length + 1) for (i <- 0 until lines.length) { linesToRun(i) = lines(i) } linesToRun(lines.length) = "print(\"\")" System.setOut(new PrintStream(out)) out.buffer().clear() var r: Code = null var incomplete: String = "" var inComment: Boolean = false for (l <- 0 until linesToRun.length) { val s: String = linesToRun(l) var continuation: Boolean = false if (l + 1 < linesToRun.length) { val nextLine: String = linesToRun(l + 1).trim if (nextLine.isEmpty || nextLine.startsWith("//") || nextLine.startsWith("}") || nextLine.startsWith("object")) { continuation = true } else if (!inComment && nextLine.startsWith("/*")) { inComment = true continuation = true } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) { inComment = false continuation = true } else if (nextLine.length > 1 && nextLine.charAt(0) == '.' && nextLine.charAt(1) != '.' && nextLine.charAt(1) != '/') { continuation = true } else if (inComment) { continuation = true } if (continuation) { incomplete += s + "\n" } } if (!continuation) { val currentCommand: String = incomplete var res: Results.Result = null try { res = Console.withOut(System.out)(new AbstractFunction0[Results.Result] { override def apply() = { imain.interpret(currentCommand + s) } }.apply()) } catch { case e: Exception => logError("Interpreter Exception ", e) return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)) } r = getResultCode(res) if (r == Code.ERROR) { return new InterpreterResult(r, out.toString) } else if (r eq Code.INCOMPLETE) { incomplete += s + "\n" } else { incomplete = "" } } } if (r eq Code.INCOMPLETE) { return new InterpreterResult(r, "Incomplete expression") } else { return new InterpreterResult(r, out.buffer().toString(Charset.forName("utf-8"))) } } private def getResultCode(r: Results.Result): Code = { if (r.isInstanceOf[Results.Success.type]) { return Code.SUCCESS } else if (r.isInstanceOf[Results.Incomplete.type]) { return Code.INCOMPLETE } else { return Code.ERROR } } } } object FlinkInterpreter extends Logging { var ourClassloader: ClassLoader = _ def main(args: Array[String]): Unit = { val interpreter: FlinkInterpreter = new FlinkInterpreter val code = """ |val dataStream = senv.fromElements(1,2,3,4,5) |dataStream.countWindowAll(2).sum(0).print() |senv.execute("My streaming program") """.stripMargin interpreter.open val result = interpreter.interpret(code) } } The error messages i got are: … … ... [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] Discard message LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not equal the received leader session ID 00000000-0000-0000-0000-000000000000. [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate JobClientActor. [INFO] [17/09/13 12:05:52] [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940]. [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote daemon. [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut down; proceeding with flushing remote transports. [INFO] [17/09/13 12:05:52] [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434) at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212) at org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87) at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) ... 34 elided Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467) ... 41 more Caused by: org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job submission to the JobManager timed out. You may increase 'akka.client.timeout' in case the JobManager needs more time to configure and confirm the job submission. at org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)