Some code cleanups on Scala Shell
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb6de22 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb6de22 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb6de22 Branch: refs/heads/master Commit: 6bb6de22fb6602058430d6d4eebf7cf36e404aca Parents: 717f881 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue May 26 14:35:53 2015 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu May 28 15:48:17 2015 +0200 ---------------------------------------------------------------------- .../flink/api/java/RemoteEnvironment.java | 14 +---- .../api/java/ScalaShellRemoteEnvironment.java | 29 +++-------- .../org.apache.flink/api/scala/FlinkILoop.scala | 54 ++++++++------------ .../org.apache.flink/api/scala/FlinkShell.scala | 28 +++++----- 4 files changed, 44 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index a2f2891..6f84077 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -30,9 +30,9 @@ import org.apache.flink.api.common.PlanExecutor; */ public class RemoteEnvironment extends ExecutionEnvironment { - private final String host; + protected final String host; - private final int port; + protected final int port; private final String[] jarFiles; @@ -87,14 +87,4 @@ public class RemoteEnvironment extends ExecutionEnvironment { return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (getParallelism() == -1 ? "default" : getParallelism()) + ") : " + getIdString(); } - - - // needed to call execute on ScalaShellRemoteEnvironment - public int getPort() { - return this.port; - } - - public String getHost() { - return this.host; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java index cb470c9..79f9576 100644 --- a/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java +++ b/flink-staging/flink-scala-shell/src/main/java/org.apache.flink/api/java/ScalaShellRemoteEnvironment.java @@ -25,15 +25,12 @@ import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.api.scala.FlinkILoop; -import java.io.File; - /** - * ScalaShellRemoteEnvironment references the JobManager through host and port parameters, - * and the Scala Shell (FlinkILoop). - * Upon calling execute(), it reads compiled lines of the Scala shell, aggregates them to a Jar - * and sends aggregated jar to the JobManager. + * Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference + * to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will + * use the reference of the ILoop to write the compiled classes of the current session to + * a Jar file and submit these with the program. */ - public class ScalaShellRemoteEnvironment extends RemoteEnvironment { // reference to Scala Shell, for access to virtual directory @@ -62,22 +59,12 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment { public JobExecutionResult execute(String jobName) throws Exception { Plan p = createProgramPlan(jobName); - // write virtual files to disk first - JarHelper jh = new JarHelper(); - - flinkILoop.writeFilesToDisk(); - - // jarr up. - File inFile = new File( flinkILoop.getTmpDirShell().getAbsolutePath()); - File outFile = new File( flinkILoop.getTmpJarShell().getAbsolutePath()); - - jh.jarDir(inFile, outFile); - - String[] jarFiles = {outFile.getAbsolutePath()}; + String jarFile = flinkILoop.writeFilesToDisk().getAbsolutePath(); // call "traditional" execution methods - PlanExecutor executor = PlanExecutor.createRemoteExecutor(super.getHost(), super.getPort(), jarFiles); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFile); + executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); return executor.executePlan(p); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala index 0de4953..83d0c72 100644 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala +++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkILoop.scala @@ -20,17 +20,17 @@ package org.apache.flink.api.scala import java.io.{BufferedReader, File, FileOutputStream} -import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -import org.apache.flink.api.java.ScalaShellRemoteEnvironment +import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment} import org.apache.flink.util.AbstractID -class FlinkILoop(val host: String, - val port: Int, - in0: Option[BufferedReader], - out0: JPrintWriter) +class FlinkILoop( + val host: String, + val port: Int, + in0: Option[BufferedReader], + out0: JPrintWriter) extends ILoop(in0, out0) { def this(host:String, port:Int, in0: BufferedReader, out: JPrintWriter){ @@ -52,11 +52,6 @@ class FlinkILoop(val host: String, scalaEnv } - - /** - * CUSTOM START METHODS OVERRIDE: - */ - addThunk { intp.beQuietDuring { // automatically imports the flink scala api @@ -68,7 +63,6 @@ class FlinkILoop(val host: String, } - /** * creates a temporary directory to store compiled console files */ @@ -96,18 +90,20 @@ class FlinkILoop(val host: String, /** - * writes contents of the compiled lines that have been executed in the shell into a - * "physical directory": creates a unique temporary directory + * Packages the compiled classes of the current shell session into a Jar file for execution + * on a Flink cluster. + * + * @return The path of the created Jar file */ - def writeFilesToDisk(): Unit = { + def writeFilesToDisk(): File = { val vd = intp.virtualDirectory - var vdIt = vd.iterator + val vdIt = vd.iterator for (fi <- vdIt) { if (fi.isDirectory) { - var fiIt = fi.iterator + val fiIt = fi.iterator for (f <- fiIt) { @@ -128,6 +124,14 @@ class FlinkILoop(val host: String, } } } + + val compiledClasses = new File(tmpDirShell.getAbsolutePath) + val jarFilePath = new File(tmpJarShell.getAbsolutePath) + + val jh: JarHelper = new JarHelper + jh.jarDir(compiledClasses, jarFilePath) + + jarFilePath } /** @@ -183,20 +187,4 @@ NOTE: Use the prebound Execution Environment "env" to read data and execute your HINT: You can use print() on a DataSet to print the contents to this shell. """) } - - // getter functions: - // get (root temporary folder) - def getTmpDirBase(): File = { - return (this.tmpDirBase); - } - - // get shell folder name inside tmp dir - def getTmpDirShell(): File = { - return (this.tmpDirShell) - } - - // get tmp jar file name - def getTmpJarShell(): File = { - return (this.tmpJarShell) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bb6de22/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala index 90615ec..7ad1d2f 100644 --- a/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala +++ b/flink-staging/flink-scala-shell/src/main/scala/org.apache.flink/api/scala/FlinkShell.scala @@ -24,8 +24,6 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster - - object FlinkShell { def main(args: Array[String]) { @@ -33,28 +31,29 @@ object FlinkShell { // scopt, command line arguments case class Config(port: Int = -1, host: String = "none") - val parser = new scopt.OptionParser[Config] ("scopt") { - head ("scopt", "3.x") + val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { + head ("Flink Scala Shell") + opt[Int] ('p', "port") action { (x, c) => c.copy (port = x) - } text ("port specifies port of running JobManager") + } text("port specifies port of running JobManager") + opt[(String)] ('h',"host") action { case (x, c) => c.copy (host = x) - } text ("host specifies host name of running JobManager") - help("help") text("prints this usage text") + } text("host specifies host name of running JobManager") + help("help") text("prints this usage text") } // parse arguments - parser.parse (args, Config () ) map { - config => - startShell(config.host,config.port); - } getOrElse { - // arguments are bad, usage message will have been displayed - println("Could not parse program arguments") + parser.parse (args, Config () ) match { + case Some(config) => + startShell(config.host,config.port) + + case _ => println("Could not parse program arguments") } } @@ -65,8 +64,7 @@ object FlinkShell { var cluster: LocalFlinkMiniCluster = null // either port or userhost not specified by user, create new minicluster - val (host,port) = if (userHost == "none" || userPort == -1 ) - { + val (host,port) = if (userHost == "none" || userPort == -1 ) { println("Creating new local server") cluster = new LocalFlinkMiniCluster(new Configuration, false) ("localhost",cluster.getJobManagerRPCPort)