dear all,

Below is the code i execute:

import java.io._
import java.net.{URL, URLClassLoader}
import java.nio.charset.Charset
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean

import com.netease.atom.common.util.logging.Logging
import com.netease.atom.interpreter.Code.Code
import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
InterpreterUtils}
import io.netty.buffer._
import org.apache.flink.api.scala.FlinkILoop
import org.apache.flink.client.CliFrontend
import org.apache.flink.client.cli.CliFrontendParser
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{QueryableStateOptions, Configuration, 
ConfigConstants, GlobalConfiguration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
LocalFlinkMiniCluster}

import scala.Console
import scala.beans.BeanProperty
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.runtime.AbstractFunction0
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}

class FlinkInterpreter extends Interpreter {
  private var bufferedReader: Option[BufferedReader] = None
  private var jprintWriter: JPrintWriter = _
  private val config = new Configuration;
  private var cluster: LocalFlinkMiniCluster = _
  @BeanProperty var imain: IMain = _
  @BeanProperty var flinkILoop: FlinkILoop = _
  private var out: ByteBufOutputStream = null
  private var outBuf: ByteBuf = null
  private var in: ByteBufInputStream = _
  private var isRunning: AtomicBoolean = new AtomicBoolean(false)

  override def isOpen: Boolean = {
    isRunning.get()
  }

  def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
    config.toMap.toMap.foreach(println)
    config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
    config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
    config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
    config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
    config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    val localCluster = new LocalFlinkMiniCluster(config, false)
    localCluster.start(true)
    val port = 
AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
    println(s"Starting local Flink cluster (host: localhost,port: 
${localCluster.getLeaderRPCPort}).\n")
    ("localhost", localCluster.getLeaderRPCPort, localCluster)
  }

 
  /**
   * Start flink cluster and create interpreter
   */
  override def open: Unit = {
    outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
    out = new ByteBufOutputStream(outBuf)
    in = new ByteBufInputStream(outBuf)
    //    val (host, port, yarnCluster) = 
deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
    val (host, port, localCluster) = startLocalMiniCluster()
    this.cluster = localCluster
    val conf = cluster.configuration
    println(s"Connecting to Flink cluster (host:$host,port:$port)...")
    flinkILoop = new FlinkILoop(host, port, conf, None)
    val settings = new Settings()
    settings.usejavacp.value = true
    settings.Yreplsync.value = true
    flinkILoop.settings_$eq(settings)
    flinkILoop.createInterpreter()
    imain = flinkILoop.intp
    FlinkInterpreter.ourClassloader = imain.classLoader
    val benv = flinkILoop.scalaBenv
    val senv = flinkILoop.scalaSenv
    benv.getConfig.disableSysoutLogging()
    senv.getConfig.disableSysoutLogging()
    // import libraries
    imain.interpret("import scala.tools.nsc.io._")
    //    imain.interpret("import Properties.userHome")
    imain.interpret("import scala.compat.Platform.EOL")
    imain.interpret("import org.apache.flink.api.scala._")
    imain.interpret("import org.apache.flink.api.common.functions._")
    isRunning.set(true)
  }

  override def interpret(line: String): InterpreterResult = {
    if (line == null || line.trim.length == 0) {
      return new InterpreterResult(Code.SUCCESS)
    }
    interpret(line.split("\n"))
  }

  /**
   * Interprete code
   * @param lines
   * @return
   */
  def interpret(lines: Array[String]): InterpreterResult = {
    val imain: IMain = getImain
    val linesToRun: Array[String] = new Array[String](lines.length + 1)
    for (i <- 0 until lines.length) {
      linesToRun(i) = lines(i)
    }
    linesToRun(lines.length) = "print(\"\")"
    System.setOut(new PrintStream(out))
    out.buffer().clear()
    var r: Code = null
    var incomplete: String = ""
    var inComment: Boolean = false
    for (l <- 0 until linesToRun.length) {
      val s: String = linesToRun(l)
      var continuation: Boolean = false
      if (l + 1 < linesToRun.length) {
        val nextLine: String = linesToRun(l + 1).trim
        if (nextLine.isEmpty ||
            nextLine.startsWith("//") ||
            nextLine.startsWith("}") ||
            nextLine.startsWith("object")) {
          continuation = true
        } else if (!inComment && nextLine.startsWith("/*")) {
          inComment = true
          continuation = true
        } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
          inComment = false
          continuation = true
        } else if (nextLine.length > 1 &&
            nextLine.charAt(0) == '.' &&
            nextLine.charAt(1) != '.' &&
            nextLine.charAt(1) != '/') {
          continuation = true
        } else if (inComment) {
          continuation = true
        }
        if (continuation) {
          incomplete += s + "\n"
        }
      }
      if (!continuation) {
        val currentCommand: String = incomplete
        var res: Results.Result = null
        try {
          res = Console.withOut(System.out)(new 
AbstractFunction0[Results.Result] {
            override def apply() = {
              imain.interpret(currentCommand + s)
            }
          }.apply())
        } catch {
          case e: Exception =>
            logError("Interpreter Exception ", e)
            return new InterpreterResult(Code.ERROR, 
InterpreterUtils.getMostRelevantMessage(e))
        }
        r = getResultCode(res)
        if (r == Code.ERROR) {
          return new InterpreterResult(r, out.toString)
        } else if (r eq Code.INCOMPLETE) {
          incomplete += s + "\n"
        } else {
          incomplete = ""
        }
      }
    }

    if (r eq Code.INCOMPLETE) {
      return new InterpreterResult(r, "Incomplete expression")
    }
    else {
      return new InterpreterResult(r, 
out.buffer().toString(Charset.forName("utf-8")))
    }
  }

  private def getResultCode(r: Results.Result): Code = {
    if (r.isInstanceOf[Results.Success.type]) {
      return Code.SUCCESS
    }
    else if (r.isInstanceOf[Results.Incomplete.type]) {
      return Code.INCOMPLETE
    }
    else {
      return Code.ERROR
    }
  }

  }
}

object FlinkInterpreter extends Logging {
  var ourClassloader: ClassLoader = _

  def main(args: Array[String]): Unit = {
    val interpreter: FlinkInterpreter = new FlinkInterpreter
    val code =
      """
        |val dataStream = senv.fromElements(1,2,3,4,5)
        |dataStream.countWindowAll(2).sum(0).print()
        |senv.execute("My streaming program")
      """.stripMargin
    interpreter.open
    val result = interpreter.interpret(code)
  }
}

The error messages i got are:
…
…
...
[WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] 
Discard message 
LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) because 
the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 did not 
equal the received leader session ID 00000000-0000-0000-0000-000000000000.
[INFO] [17/09/13 12:05:52] 
[org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate 
JobClientActor.
[INFO] [17/09/13 12:05:52] 
[org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from 
JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940].
[INFO] [17/09/13 12:05:52] 
[akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote 
daemon.
[INFO] [17/09/13 12:05:52] 
[akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut 
down; proceeding with flushing remote transports.
[INFO] [17/09/13 12:05:52] 
[akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
  at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
  at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
  at 
org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
  at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
  at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
  ... 34 elided
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
  at 
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
  at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
  ... 41 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
  at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
  at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
  at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
  at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
  at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




Reply via email to