eyalbenivri closed pull request #11: Amaterasu-14 URL: https://github.com/apache/incubator-amaterasu/pull/11
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/build.gradle b/common/build.gradle index 0b6cf8a..5a0a211 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -47,7 +47,7 @@ dependencies { // currently we have to use this specific mesos version to prevent from // clashing with spark - compile('org.apache.mesos:mesos:0.22.2:shaded-protobuf'){ + compile('org.apache.mesos:mesos:0.22.2:shaded-protobuf') { exclude group: 'com.google.protobuf', module: 'protobuf-java' } provided group: 'org.apache.hadoop', name: 'hadoop-yarn-client', version: '2.7.3' diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala index 653c285..35a6339 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala @@ -40,7 +40,7 @@ class ClusterConfig extends Logging { var distLocation: String = "local" var workingFolder: String = "" // TODO: get rid of hard-coded version - var pysparkPath: String = "spark-2.1.1-bin-hadoop2.7/bin/spark-submit" + var pysparkPath: String = "spark-2.2.1-bin-hadoop2.7/bin/spark-submit" var Jar: String = _ var JarName: String = _ // the additionalClassPath is currently for testing purposes, when amaterasu is diff --git a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala index e126241..f49d8ad 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala @@ -25,6 +25,6 @@ case class Environment() { var outputRootPath: String = "" var workingDir: String = "" - var configuration: Map[String, String] = null + var configuration: Map[String, String] = _ } \ No newline at end of file diff --git a/executor/build.gradle b/executor/build.gradle index 30076d4..a173c58 100644 --- a/executor/build.gradle +++ b/executor/build.gradle @@ -55,25 +55,29 @@ dependencies { compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8' compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8' compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8' - - compile ('com.jcabi:jcabi-aether:0.10.1') { - exclude group: 'org.jboss.netty' - } + compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final' + compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5' compile group: 'org.reflections', name: 'reflections', version: '0.9.10' compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.5' compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5' compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5' compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5' - compile group: 'org.apache.activemq', name: 'activemq-client', version: '5.15.2' compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0' + compile('com.jcabi:jcabi-aether:0.10.1') { + exclude group: 'org.jboss.netty' + } + compile('org.apache.activemq:activemq-client:5.15.2') { + exclude group: 'org.jboss.netty' + } + compile project(':common') compile project(':amaterasu-sdk') //runtime dependency for spark - provided ('org.apache.spark:spark-repl_2.11:2.1.1') - provided ('org.apache.spark:spark-core_2.11:2.1.1') + provided('org.apache.spark:spark-repl_2.11:2.2.1') + provided('org.apache.spark:spark-core_2.11:2.2.1') testCompile project(':common') testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" @@ -81,7 +85,7 @@ dependencies { testCompile 'junit:junit:4.11' testCompile 'org.scalatest:scalatest_2.11:3.0.2' testCompile 'org.scala-lang:scala-library:2.11.8' - testCompile ('org.apache.spark:spark-repl_2.11:2.1.1') + testCompile('org.apache.spark:spark-repl_2.11:2.2.1') testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9' } diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py index f8536ea..f1752a2 100755 --- a/executor/src/main/resources/spark_intp.py +++ b/executor/src/main/resources/spark_intp.py @@ -22,7 +22,7 @@ # sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark') # sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j') -# py4j_path = 'spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip' +# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip' # py4j_importer = zipimport.zipimporter(py4j_path) # py4j = py4j_importer.load_module('py4j') from py4j.java_gateway import JavaGateway, GatewayClient, java_import diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala index 2f73b71..0c2edf8 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.amaterasu.executor.common.executors import javax.jms.{DeliveryMode, MessageProducer, Session} diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala index eec0106..5f78bc2 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala @@ -18,6 +18,7 @@ package org.apache.amaterasu.executor.common.executors import java.io.ByteArrayOutputStream +import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ExecData import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider} @@ -52,12 +53,18 @@ object ProvidersFactory { val result = new ProvidersFactory() val reflections = new Reflections(getClass.getClassLoader) val runnerTypes = reflections.getSubTypesOf(classOf[RunnersProvider]).toSet + val config = if (propFile != null) { + import java.io.FileInputStream + ClusterConfig.apply(new FileInputStream(propFile)) + } else { + new ClusterConfig() + } result.providers = runnerTypes.map(r => { val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider] - provider.init(data, jobId, outStream, notifier, executorId, propFile, hostName) + provider.init(data, jobId, outStream, notifier, executorId, config, hostName) notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created") (provider.getGroupIdentifier, provider) }).toMap diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala index 7f5955e..94b8056 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala @@ -177,7 +177,7 @@ object PySparkRunner { private def installAnacondaOnNode(): Unit = { Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") - Seq("bash", "-c", "ln -s $PWD/spark-2.1.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") + Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") } diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index ce3b2ba..d1c33bb 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -25,6 +25,7 @@ import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage} import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner +import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider} import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner} import org.eclipse.aether.util.artifact.JavaScopes @@ -52,10 +53,9 @@ class SparkRunnersProvider extends RunnersProvider with Logging { outStream: ByteArrayOutputStream, notifier: Notifier, executorId: String, - propFile: String, + config: ClusterConfig, hostName: String): Unit = { - val config = ClusterConfig(new FileInputStream(propFile)) shellLoger = ProcessLogger( (o: String) => log.info(o), (e: String) => log.error("", e) @@ -77,7 +77,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { val sparkAppName = s"job_${jobId}_executor_$executorId" SparkRunnerHelper.notifier = notifier - val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, propFile, hostName) + val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, config, hostName) lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, outStream, notifier, jars) sparkScalaRunner.initializeAmaContext(data.env) @@ -86,7 +86,10 @@ class SparkRunnersProvider extends RunnersProvider with Logging { // TODO: get rid of hard-coded version lazy val pySparkRunner = PySparkRunner(data.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", data.pyDeps, config) - runners.put(pySparkRunner.getIdentifier(), pySparkRunner) + runners.put(pySparkRunner.getIdentifier, pySparkRunner) + + lazy val sparkSqlRunner = SparkSqlRunner(data.env, jobId, notifier, spark) + runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner) } private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = { @@ -102,7 +105,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { // TODO: get rid of hard-coded version Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger - Seq("bash", "-c", "ln -s $PWD/spark-2.1.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger + Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger } private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = { diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala index 7a7fd0d..350ddb4 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkSql/SparkSqlRunner.scala @@ -17,23 +17,26 @@ package org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql import java.io.File +import java.util import org.apache.amaterasu.common.execution.actions.Notifier import org.apache.amaterasu.common.logging.Logging import org.apache.amaterasu.common.runtime.Environment import org.apache.amaterasu.executor.runtime.AmaContext +import org.apache.amaterasu.sdk.AmaterasuRunner import org.apache.commons.io.FilenameUtils import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import scala.collection.JavaConverters._ /** * Amaterasu currently supports JSON and PARQUET as data sources. * CSV data source support will be provided in later versions. */ -class SparkSqlRunner extends Logging { +class SparkSqlRunner extends Logging with AmaterasuRunner { var env: Environment = _ var notifier: Notifier = _ var jobId: String = _ - var actionName: String = _ + //var actionName: String = _ var spark: SparkSession = _ /* @@ -42,17 +45,19 @@ class SparkSqlRunner extends Logging { If not in Amaterasu format, then directly executes the query @Params: query string */ - def executeQuery(query: String): Unit = { + override def executeSource(actionSource: String, actionName: String, exports: util.Map[String, String]): Unit = { - notifier.info(s"================= executing the SQL query =================") - if (!query.isEmpty) { + notifier.info(s"================= started action $actionName =================") - if (query.toLowerCase.contains("amacontext")) { + if (!actionSource.isEmpty) { + + var result: DataFrame = null + if (actionSource.toLowerCase.contains("amacontext")) { //Parse the incoming query - notifier.info(s"================= parsing the SQL query =================") + //notifier.info(s"================= parsing the SQL query =================") - val parser: List[String] = query.toLowerCase.split(" ").toList + val parser: List[String] = actionSource.toLowerCase.split(" ").toList var sqlPart1: String = "" var sqlPart2: String = "" var queryTempLen: Int = 0 @@ -93,26 +98,31 @@ class SparkSqlRunner extends Logging { val loadData: DataFrame = AmaContext.getDataFrame(actionName, dfName, fileFormat) loadData.createOrReplaceTempView(locationPath) - notifier.info("Executing SparkSql on: "+parsedQuery) - val sqlDf = spark.sql(parsedQuery) - //@TODO: outputFileFormat should be read from YAML file instead of input fileformat - writeDf(sqlDf, fileFormat, env.workingDir, jobId, actionName) - notifier.info(s"================= finished action $actionName =================") + try{ + + result = spark.sql(parsedQuery) + notifier.success(parsedQuery) + } catch { + case e: Exception => notifier.error(parsedQuery, e.getMessage) + } + } else { - notifier.info("Executing SparkSql on: "+query) - - val fildDf = spark.sql(query) - //@TODO: outputFileFormat should be read from YAML file instead of output fileFormat being empty - writeDf(fildDf, "", env.workingDir, jobId, actionName) + notifier.info("Executing SparkSql on: " + actionSource) - notifier.info(s"================= finished action $actionName =================") + result = spark.sql(actionSource) + } + val exportsBuff = exports.asScala.toBuffer + if (exportsBuff.nonEmpty) { + val exportName = exportsBuff.head._1 + val exportFormat = exportsBuff.head._2 + //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName") + result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName") } + notifier.info(s"================= finished action $actionName =================") } - - notifier.info(s"================= finished action $actionName =================") } /* @@ -128,25 +138,7 @@ class SparkSqlRunner extends Logging { extensions } - /* - Method to write dataframes to a specified format - @Params - df: Dataframe to be written - fileFormat: same as input file format - workingDir: temp directory - jobId, actionName: As specified by the user - */ - def writeDf(df: DataFrame, outputFileFormat: String, workingDir: String, jobId: String, actionName: String): Unit = { - outputFileFormat.toLowerCase match { - case "parquet" => df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + actionName + "Df") - case "json" => df.write.mode(SaveMode.Overwrite).json(s"$workingDir/$jobId/$actionName/" + actionName + "Df") - case "csv" => df.write.mode(SaveMode.Overwrite).csv(s"$workingDir/$jobId/$actionName/" + actionName + "Df") - case "orc" => df.write.mode(SaveMode.Overwrite).orc(s"$workingDir/$jobId/$actionName/" + actionName + "Df") - case "text" => df.write.mode(SaveMode.Overwrite).text(s"$workingDir/$jobId/$actionName/" + actionName + "Df") - //case "jdbc" => df.write.mode(SaveMode.Overwrite).jdbc(s"$workingDir/$jobId/$actionName/" + actionName + "Df") - case _ => df.write.mode(SaveMode.Overwrite).parquet(s"$workingDir/$jobId/$actionName/" + actionName + "Df") - } - } + override def getIdentifier: String = "sql" } @@ -154,7 +146,7 @@ object SparkSqlRunner { def apply(env: Environment, jobId: String, - actionName: String, + // actionName: String, notifier: Notifier, spark: SparkSession): SparkSqlRunner = { @@ -162,7 +154,7 @@ object SparkSqlRunner { sparkSqlRunnerObj.env = env sparkSqlRunnerObj.jobId = jobId - sparkSqlRunnerObj.actionName = actionName + //sparkSqlRunnerObj.actionName = actionName sparkSqlRunnerObj.notifier = notifier sparkSqlRunnerObj.spark = spark sparkSqlRunnerObj diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala similarity index 95% rename from executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala rename to executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala index 969eb0b..9ab75be 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/ActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala @@ -33,7 +33,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.util.{Failure, Success} -class ActionsExecutor extends Executor with Logging { +class MesosActionsExecutor extends Executor with Logging { var master: String = _ var executorDriver: ExecutorDriver = _ @@ -83,7 +83,7 @@ class ActionsExecutor extends Executor with Logging { notifier = new MesosNotifier(driver) notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered") val outStream = new ByteArrayOutputStream() - providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName) + providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, propFile = "./amaterasu.properties") } @@ -133,13 +133,13 @@ class ActionsExecutor extends Executor with Logging { } -object ActionsExecutorLauncher extends Logging { +object MesosActionsExecutor extends Logging { def main(args: Array[String]) { System.loadLibrary("mesos") log.debug("Starting a new ActionExecutor") - val executor = new ActionsExecutor + val executor = new MesosActionsExecutor executor.jobId = args(0) executor.master = args(1) executor.actionName = args(2) diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala index 9a17b5c..841fe42 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.amaterasu.executor.yarn.executors import org.apache.amaterasu.common.execution.actions.Notifier diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala index ba6a3e1..abab8a4 100644 --- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala @@ -26,7 +26,7 @@ import org.apache.amaterasu.common.utils.FileUtils import org.apache.spark.repl.amaterasu.AmaSparkILoop import org.apache.spark.sql.SparkSession import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkConf import scala.tools.nsc.GenericRunnerSettings import scala.tools.nsc.interpreter.IMain @@ -37,7 +37,6 @@ object SparkRunnerHelper extends Logging { private val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) private val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") - private var sparkContext: SparkContext = _ private var sparkSession: SparkSession = _ var notifier: Notifier = _ @@ -97,7 +96,7 @@ object SparkRunnerHelper extends Logging { } catch { case e: Exception => - println("+++++++>" + new Predef.String(outStream.toByteArray)) + println(new Predef.String(outStream.toByteArray)) } @@ -110,19 +109,14 @@ object SparkRunnerHelper extends Logging { jars: Seq[String], sparkConf: Option[Map[String, Any]], executorEnv: Option[Map[String, Any]], - propFile: String, + config: ClusterConfig, hostName: String): SparkSession = { - val config = if (propFile != null) { - import java.io.FileInputStream - ClusterConfig.apply(new FileInputStream(propFile)) - } else { - new ClusterConfig() - } - Thread.currentThread().setContextClassLoader(getClass.getClassLoader) - - val pyfiles = FileUtils.getAllFiles(new File("miniconda/pkgs")).filter(f => f.getName.endsWith(".py") || + val minicondaPkgsPath = "miniconda/pkgs" + val executorMinicondaDirRef = new File(minicondaPkgsPath) + val minicondaFiles = if (executorMinicondaDirRef.exists) FileUtils.getAllFiles(executorMinicondaDirRef) else new Array[File](0) + val pyfiles = minicondaFiles.filter(f => f.getName.endsWith(".py") || f.getName.endsWith(".egg") || f.getName.endsWith(".zip")) @@ -132,30 +126,36 @@ object SparkRunnerHelper extends Logging { .set("spark.hadoop.validateOutputSpecs", "false") .set("spark.logConf", "true") .set("spark.submit.pyFiles", pyfiles.mkString(",")) - .setJars("executor.jar" +: jars) + val master: String = if (env.master.isEmpty) { + "yarn" + } else { + env.master + } + config.mode match { case "mesos" => - conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-2.1.1-bin-hadoop2.7.tgz") + conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz") + .setJars(jars) .set("spark.master", env.master) - .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-2.1.1-bin-hadoop2.7") + .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-2.2.1-bin-hadoop2.7") case "yarn" => conf.set("spark.home", config.spark.home) // TODO: parameterize those + .setJars(s"executor-${config.version}-all.jar" +: jars) .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab") .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64") .set("spark.yarn.queue", "default") .set("spark.history.kerberos.principal", "none") - .set("spark.master", "yarn") + .set("spark.master", master) .set("spark.executor.instances", "1") // TODO: change this .set("spark.yarn.jars", s"spark/jars/*") .set("spark.executor.memory", "1g") .set("spark.dynamicAllocation.enabled", "false") - //.set("spark.shuffle.service.enabled", "true") .set("spark.eventLog.enabled", "false") .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/") .set("hadoop.home.dir", config.YARN.hadoopHomeDir) @@ -201,7 +201,6 @@ object SparkRunnerHelper extends Logging { //.enableHiveSupport() .config(conf).getOrCreate() - log.info("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^") sparkSession.conf.getAll.foreach(x => log.info(x.toString)) val hc = sparkSession.sparkContext.hadoopConfiguration diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala index 9261080..a45b8c0 100755 --- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala +++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkScalaRunner.scala @@ -99,7 +99,6 @@ class SparkScalaRunner(var env: Environment, case ds: Dataset[_] => log.debug(s"persisting DataFrame: $resultName") val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")""" - //notifier.info(writeLine) val writeResult = interpreter.interpret(writeLine) if (writeResult != Results.Success) { val err = outStream.toString diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala index 0312dad..153d984 100644 --- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkSqlRunnerTests.scala @@ -23,12 +23,13 @@ import org.apache.amaterasu.utilities.TestNotifier import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.sql.{SaveMode, SparkSession} -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - +import org.scalatest.{BeforeAndAfterAll, DoNotDiscover, FlatSpec, Matchers} +import scala.collection.JavaConverters._ /** * Created by kirupa on 10/12/16. */ +@DoNotDiscover class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll { Logger.getLogger("org").setLevel(Level.OFF) @@ -44,7 +45,7 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll override protected def beforeAll(): Unit = { - val env = new Environment() + val env = Environment() env.workingDir = "file:/tmp/" spark = SparkSession.builder() .appName("sql-job") @@ -69,17 +70,18 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll "SparkSql" should "load data as parquet if no input foramt is specified" in { - val defaultParquetEnv = new Environment() + val defaultParquetEnv = Environment() defaultParquetEnv.workingDir = "file:/tmp/" AmaContext.init(spark, "sparkSqlDefaultParquetJob", defaultParquetEnv) + //Prepare test dataset val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath) inputDf.write.mode(SaveMode.Overwrite).parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionTempDf") - val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlDefaultParquetJob", "sparkSqlDefaultParquetJobAction", notifier, spark) - sparkSql.executeQuery("select * FROM AMACONTEXT_sparkSqlDefaultParquetJobAction_sparkSqlDefaultParquetJobActionTempDf where age=22") - val outputDf = spark.read.parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sparkSqlDefaultParquetJobAction/sparkSqlDefaultParquetJobActionDf") - println("Output Default Parquet: "+inputDf.count + "," + outputDf.first().getString(1)) - outputDf.first().getString(1) shouldEqual("Michael") + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlDefaultParquetJob", notifier, spark) + sparkSql.executeSource("select * FROM AMACONTEXT_sparkSqlDefaultParquetJobAction_sparkSqlDefaultParquetJobActionTempDf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava) + val outputDf = spark.read.parquet(s"${defaultParquetEnv.workingDir}/sparkSqlDefaultParquetJob/sql_parquet_test/result") + println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1)) + outputDf.first().getString(1) shouldEqual "Michael" } /* @@ -88,17 +90,18 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll "SparkSql" should "load PARQUET data directly from previous action's dataframe and persist the Data in working directory" in { - val tempParquetEnv = new Environment() + val tempParquetEnv = Environment() tempParquetEnv.workingDir = "file:/tmp/" AmaContext.init(spark, "sparkSqlParquetJob", tempParquetEnv) + //Prepare test dataset val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath) inputDf.write.mode(SaveMode.Overwrite).parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionTempDf") - val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlParquetJob", "sparkSqlParquetJobAction", notifier, spark) - sparkSql.executeQuery("select * FROM AMACONTEXT_sparkSqlParquetJobAction_sparkSqlParquetJobActionTempDf READAS parquet") - val outputDf = spark.read.parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sparkSqlParquetJobAction/sparkSqlParquetJobActionDf") - println("Output Parquet: "+inputDf.count + "," + outputDf.count) - inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1)) + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlParquetJob", notifier, spark) + sparkSql.executeSource("select * FROM AMACONTEXT_sparkSqlParquetJobAction_sparkSqlParquetJobActionTempDf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava) + val outputDf = spark.read.parquet(s"${tempParquetEnv.workingDir}/sparkSqlParquetJob/sql_parquet_test/result2") + println("Output Parquet: " + inputDf.count + "," + outputDf.count) + inputDf.first().getString(1) shouldEqual outputDf.first().getString(1) } @@ -108,17 +111,18 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll "SparkSql" should "load JSON data directly from previous action's dataframe and persist the Data in working directory" in { - val tempJsonEnv = new Environment() + val tempJsonEnv = Environment() tempJsonEnv.workingDir = "file:/tmp/" AmaContext.init(spark, "sparkSqlJsonJob", tempJsonEnv) //Prepare test dataset + val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath) inputDf.write.mode(SaveMode.Overwrite).json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionTempDf") - val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlJsonJob", "sparkSqlJsonJobAction", notifier, spark) - sparkSql.executeQuery("select * FROM amacontext_sparkSqlJsonJobAction_sparkSqlJsonJobActionTempDf where age='30' READAS json") - val outputDf = spark.read.json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sparkSqlJsonJobAction/sparkSqlJsonJobActionDf") - println("Output JSON: "+inputDf.count + "," + outputDf.count) - outputDf.first().getString(1) shouldEqual("Kirupa") + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlJsonJob", notifier, spark) + sparkSql.executeSource("select * FROM amacontext_sparkSqlJsonJobAction_sparkSqlJsonJobActionTempDf where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava) + val outputDf = spark.read.json(s"${tempJsonEnv.workingDir}/sparkSqlJsonJob/sql_json_test/result") + println("Output JSON: " + inputDf.count + "," + outputDf.count) + outputDf.first().getString(1) shouldEqual "Kirupa" } @@ -128,40 +132,42 @@ class SparkSqlRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll "SparkSql" should "load CSV data directly from previous action's dataframe and persist the Data in working directory" in { - val tempCsvEnv = new Environment() + val tempCsvEnv = Environment() tempCsvEnv.workingDir = "file:/tmp/" AmaContext.init(spark, "sparkSqlCsvJob", tempCsvEnv) + //Prepare test dataset val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath) inputDf.write.mode(SaveMode.Overwrite).csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionTempDf") - val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlCsvJob", "sparkSqlCsvJobAction", notifier, spark) - sparkSql.executeQuery("select * FROM amacontext_sparkSqlCsvJobAction_sparkSqlCsvJobActionTempDf READAS csv") - val outputDf = spark.read.csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sparkSqlCsvJobAction/sparkSqlCsvJobActionDf") - println("Output CSV: "+inputDf.count + "," + outputDf.count) - inputDf.first().getString(1) shouldEqual(outputDf.first().getString(1)) + val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlCsvJob", notifier, spark) + sparkSql.executeSource("select * FROM amacontext_sparkSqlCsvJobAction_sparkSqlCsvJobActionTempDf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava) + + val outputDf = spark.read.csv(s"${tempCsvEnv.workingDir}/sparkSqlCsvJob/sql_csv_test/result") + println("Output CSV: " + inputDf.count + "," + outputDf.count) + inputDf.first().getString(1) shouldEqual outputDf.first().getString(1) } + /* Test whether the data can be directly read from a file and executed by sparkSql */ - - "SparkSql" should "load data directly from a file and persist the Data in working directory" in { - - val tempFileEnv = new Environment() - tempFileEnv.workingDir = "file:/tmp/" - AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv) - - val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", "sparkSqlFileJobAction", notifier, spark) - sparkSql.executeQuery("SELECT * FROM parquet.`"+getClass.getResource("/SparkSql/parquet").getPath+"`") - val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf") - println("Output Parquet dataframe: "+ outputParquetDf.show) - outputParquetDf.first().getString(1) shouldEqual("Michael") - sparkSql.executeQuery("SELECT * FROM json.`"+getClass.getResource("/SparkSql/json").getPath+"`") - //@TODO: change the below read.parquet to read.outputFileFormat specified in the yaml file - val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sparkSqlFileJobAction/sparkSqlFileJobActionDf") - println("Output Json dataframe: "+ outputJsonDf.show) - outputJsonDf.first().getString(1) shouldEqual("Sampath") - - } +// "SparkSql" should "load data directly from a file and persist the Data in working directory" in { +// +// val tempFileEnv = Environment() +// tempFileEnv.workingDir = "file:/tmp/" +// AmaContext.init(spark, "sparkSqlFileJob", tempFileEnv) +// +// val sparkSql: SparkSqlRunner = SparkSqlRunner(AmaContext.env, "sparkSqlFileJob", notifier, spark) +// sparkSql.executeSource("SELECT * FROM parquet.`" + getClass.getResource("/SparkSql/parquet").getPath + "`", "sql_parquet_file_test", Map("result" -> "parquet").asJava) +// val outputParquetDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result") +// println("Output Parquet dataframe: " + outputParquetDf.show) +// outputParquetDf.first().getString(1) shouldEqual "Michael" +// sparkSql.executeSource("SELECT * FROM json.`" + getClass.getResource("/SparkSql/json").getPath + "`","sql_parquet_file_test", Map("result" -> "json").asJava) +// +// val outputJsonDf = spark.read.parquet(s"${tempFileEnv.workingDir}/sparkSqlFileJob/sql_parquet_file_test/result") +// println("Output Json dataframe: " + outputJsonDf.show) +// outputJsonDf.first().getString(1) shouldEqual "Sampath" +// +// } } diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala index 505ade6..49ab882 100644 --- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkTestsSuite.scala @@ -28,17 +28,25 @@ import org.apache.spark.repl.amaterasu.runners.spark.SparkScalaRunner import org.apache.spark.sql.SparkSession import org.scalatest._ + + import scala.collection.mutable.ListBuffer class SparkTestsSuite extends Suites( - new PySparkRunnerTests(), - new RunnersLoadingTests()) with BeforeAndAfterAll { + new PySparkRunnerTests, + new RunnersLoadingTests, + new SparkSqlRunnerTests) with BeforeAndAfterAll { var env: Environment = _ var factory: ProvidersFactory = _ var spark: SparkSession = _ + private def createTestMiniconda(): Unit = { + println(s"PATH: ${new File(".").getAbsolutePath}") + new File("miniconda/pkgs").mkdirs() + } + override def beforeAll(): Unit = { env = Environment() @@ -48,11 +56,6 @@ class SparkTestsSuite extends Suites( // I can't apologise enough for this val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent - val conf = Map[String, Any]( - "spark.cassandra.connection.host" -> "127.0.0.1", - "sourceTable" -> "documents", - "spark.local.ip" -> "127.0.0.1" - ) env.master = "local[1]" if (env.configuration != null) env.configuration ++ "pysparkPath" -> "/usr/bin/python" else env.configuration = Map( "pysparkPath" -> "/usr/bin/python", @@ -61,6 +64,7 @@ class SparkTestsSuite extends Suites( val excEnv = Map[String, Any]( "PYTHONPATH" -> resources ) + createTestMiniconda() env.configuration ++ "spark_exec_env" -> excEnv factory = ProvidersFactory(ExecData(env, Dependencies(ListBuffer.empty[Repo], List.empty[Artifact]), @@ -72,7 +76,8 @@ class SparkTestsSuite extends Suites( new ByteArrayOutputStream(), new TestNotifier(), "test", - getClass.getResource("/amaterasu.properties").getPath) + "localhost", + getClass.getClassLoader.getResource("amaterasu.properties").getPath) spark = factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner].spark this.nestedSuites.filter(s => s.isInstanceOf[RunnersLoadingTests]).foreach(s => s.asInstanceOf[RunnersLoadingTests].factory = factory) @@ -83,6 +88,7 @@ class SparkTestsSuite extends Suites( } override def afterAll(): Unit = { + new File("miniconda").delete() spark.stop() super.afterAll() diff --git a/leader/build.gradle b/leader/build.gradle index 429f072..8595d02 100644 --- a/leader/build.gradle +++ b/leader/build.gradle @@ -52,7 +52,7 @@ dependencies { compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.2.19.v20160908' compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.2.19.v20160908' - compile group: 'javax.servlet', name: 'javax.servlet-api', version: '2.5.0' + //compile group: 'javax.servlet', name: 'javax.servlet-api', version: '3.1.0' compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0' compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r' compile group: 'org.yaml', name: 'snakeyaml', version: '1.18' @@ -71,9 +71,7 @@ dependencies { testCompile 'junit:junit:4.11' testCompile 'org.scalatest:scalatest_2.11:3.0.2' testCompile 'org.scala-lang:scala-library:2.11.8' - testCompile( 'org.apache.curator:curator-test:2.9.1'){ - exclude group: 'com.fasterxml.jackson.core', module: 'jackson-databind' - } + testCompile 'org.apache.curator:curator-test:2.9.1' } diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java index b89c263..be0fc05 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.amaterasu.leader.yarn; import org.apache.commons.cli.*; diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java index eb2659d..b8c29b7 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.amaterasu.leader.yarn; public class JobOpts { diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala index 631564e..0706491 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala @@ -3,6 +3,7 @@ package org.apache.amaterasu.leader.mesos import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.leader.mesos.schedulers.JobScheduler import org.apache.amaterasu.leader.utilities.{Args, BaseJobLauncher} +import org.apache.log4j.LogManager import org.apache.mesos.Protos.FrameworkID import org.apache.mesos.{MesosSchedulerDriver, Protos} @@ -13,6 +14,8 @@ import org.apache.mesos.{MesosSchedulerDriver, Protos} object MesosJobLauncher extends BaseJobLauncher { override def run(arguments: Args, config: ClusterConfig, resume: Boolean): Unit = { + LogManager.resetConfiguration() + val frameworkBuilder = Protos.FrameworkInfo.newBuilder() .setName(s"${arguments.name} - Amaterasu Job") .setFailoverTimeout(config.timeout) diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index f5b572a..ec9935c 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -33,6 +33,7 @@ import org.apache.amaterasu.leader.execution.{JobLoader, JobManager} import org.apache.amaterasu.leader.utilities.{DataLoader, HttpServer} import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.ExponentialBackoffRetry +import org.apache.log4j.LogManager import org.apache.mesos.Protos.CommandInfo.URI import org.apache.mesos.Protos._ import org.apache.mesos.protobuf.ByteString @@ -48,6 +49,7 @@ import scala.collection.concurrent.TrieMap */ class JobScheduler extends AmaterasuScheduler { + LogManager.resetConfiguration() private var jobManager: JobManager = _ private var client: CuratorFramework = _ private var config: ClusterConfig = _ @@ -163,7 +165,7 @@ class JobScheduler extends AmaterasuScheduler { val command = CommandInfo .newBuilder .setValue( - s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-0.2.0-incubating-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.ActionsExecutorLauncher ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin + s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")} env MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/dist/spark-${config.Webserver.sparkVersion}.tgz java -cp executor-0.2.0-incubating-all.jar:spark-${config.Webserver.sparkVersion}/jars/* -Dscala.usejavacp=true -Djava.library.path=/usr/lib org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor ${jobManager.jobId} ${config.master} ${actionData.name}""".stripMargin ) // HttpServer.getFilesInDirectory(sys.env("AMA_NODE"), config.Webserver.Port).foreach(f=> // ) @@ -173,7 +175,7 @@ class JobScheduler extends AmaterasuScheduler { .setExtract(false) .build()) .addUris(URI.newBuilder() - .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.1.1-bin-hadoop2.7.tgz") + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/spark-2.2.1-bin-hadoop2.7.tgz") .setExecutable(false) .setExtract(true) .build()) @@ -197,6 +199,11 @@ class JobScheduler extends AmaterasuScheduler { .setExecutable(false) .setExtract(false) .build()) + .addUris(URI.newBuilder() + .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/amaterasu.properties") + .setExecutable(false) + .setExtract(false) + .build()) executor = ExecutorInfo .newBuilder .setData(ByteString.copyFrom(execData)) @@ -310,6 +317,7 @@ object JobScheduler { report: String, home: String): JobScheduler = { + LogManager.resetConfiguration() val scheduler = new JobScheduler() HttpServer.start(config.Webserver.Port, s"$home/${config.Webserver.Root}") diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala index 24b9e90..2e01963 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala @@ -16,10 +16,9 @@ */ package org.apache.amaterasu.leader.utilities -//import org.apache.amaterasu.Logging import org.apache.amaterasu.common.logging.Logging import org.apache.log4j.{BasicConfigurator, Level, Logger} -import org.eclipse.jetty.server.{Server, ServerConnector} +import org.eclipse.jetty.server.{Handler, Server, ServerConnector} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder} import org.eclipse.jetty.toolchain.test.MavenTestingUtils @@ -38,39 +37,29 @@ import scala.text.Document * Implementation of Jetty Web server to server Amaterasu libraries and other distribution files */ object HttpServer extends Logging { - val logger = Logger.getLogger(HttpServer.getClass) - var server: Server = null + + var server: Server = _ def start(port: String, serverRoot: String): Unit = { - /*val threadPool = new QueuedThreadPool(Runtime.getRuntime.availableProcessors() * 16) - threadPool.setName("Jetty")*/ BasicConfigurator.configure() initLogging() + server = new Server() val connector = new ServerConnector(server) connector.setPort(port.toInt) server.addConnector(connector) - val rh0 = new ResourceHandler() - rh0.setDirectoriesListed(true) - rh0.setResourceBase(serverRoot) - val context0 = new ContextHandler() - context0.setContextPath("/*") - //context0.setContextPath("/") - //val dir0 = MavenTestingUtils.getTestResourceDir("dist") - //context0.setBaseResource(Resource.newResource(dir0)) - context0.setHandler(rh0) - val context = new ServletContextHandler(ServletContextHandler.SESSIONS) - context.setResourceBase(serverRoot) - context.setContextPath("/") - context.setErrorHandler(new ErrorHandler()) - context.setInitParameter("dirAllowed", "true") - context.setInitParameter("pathInfoOnly", "true") - context.addServlet(new ServletHolder(new DefaultServlet()), "/") - val contexts = new ContextHandlerCollection() - contexts.setHandlers(Array(context0, context)) - server.setHandler(contexts) + + val handler = new ResourceHandler() + handler.setDirectoriesListed(true) + handler.setWelcomeFiles(Array[String]("index.html")) + handler.setResourceBase(serverRoot) + val handlers = new HandlerList() + handlers.setHandlers(Array(handler, new DefaultHandler())) + + server.setHandler(handlers) server.start() + } def stop() { @@ -93,11 +82,13 @@ object HttpServer extends Logging { Note: Should the files in URL root be fetched, provide an empty value to directory. */ def getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array[String] = { + println("http://" + amaNode + ":" + port + "/" + directory) val html: BufferedSource = Source.fromURL("http://" + amaNode + ":" + port + "/" + directory) + println(html) val htmlDoc = Jsoup.parse(html.mkString) val htmlElement: Elements = htmlDoc.body().select("a") val files = htmlElement.asScala - val fileNames = files.map(url => url.attr("href")).filter(file => (!file.contains(".."))).map(name => name.replace("/", "")).toArray + val fileNames = files.map(url => url.attr("href")).filter(file => !file.contains("..")).map(name => name.replace("/", "")).toArray fileNames } } \ No newline at end of file diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader/src/main/scripts/ama-start-mesos.sh index 6c9139c..18dbed9 100755 --- a/leader/src/main/scripts/ama-start-mesos.sh +++ b/leader/src/main/scripts/ama-start-mesos.sh @@ -95,7 +95,7 @@ esac done echo "repo: ${REPO} " -CMD="java -cp ${BASEDIR}/bin/*-all.jar -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.JobLauncher --home ${BASEDIR}" +CMD="java -cp ${BASEDIR}/bin/*-all.jar -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}" if [ -n "$REPO" ]; then CMD+=" --repo ${REPO}" @@ -123,13 +123,14 @@ fi if ! ls ${BASEDIR}/dist/spark*.tgz 1> /dev/null 2>&1; then echo "${bold} Fetching spark distributable ${NC}" - wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist + #wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist + wget http://apache.mirror.digitalpacific.com.au/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz -P ${BASEDIR}/dist fi if [ ! -f ${BASEDIR}/dist/Miniconda2-latest-Linux-x86_64.sh ]; then echo "${bold}Fetching miniconda distributable ${NC}" wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -P ${BASEDIR}/dist fi - +cp ${BASEDIR}/amaterasu.properties ${BASEDIR}/dist eval $CMD | grep "===>" echo "" diff --git a/leader/src/main/scripts/amaterasu.properties b/leader/src/main/scripts/amaterasu.properties index 57c6006..7961db9 100755 --- a/leader/src/main/scripts/amaterasu.properties +++ b/leader/src/main/scripts/amaterasu.properties @@ -5,7 +5,7 @@ user=root mode=yarn webserver.port=8000 webserver.root=dist -spark.version=2.1.1-bin-hadoop2.7 +spark.version=2.2.1-bin-hadoop2.7 yarn.queue=default yarn.jarspath=hdfs:///apps/amaterasu spark.home=/usr/hdp/current/spark2-client diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala index 4034c8e..bd200a0 100644 --- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala +++ b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala @@ -32,46 +32,47 @@ class HttpServerTests extends FlatSpec with Matchers { // this is an ugly hack, getClass.getResource("/").getPath should have worked but // stopped working when we moved to gradle :( + val resources = new File(getClass.getResource("/simple-maki.yml").getPath).getParent - -// "Jetty Web server" should "start HTTP server, serve content and stop successfully" in { -// val resources = new File(getClass.getResource("/simple-maki.yml").getPath).getParent -// var data = "" -// try { -// HttpServer.start("8000", resources) -// val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt") -// data = html.mkString -// } -// finally { -// HttpServer.stop() -// } -// data should equal("This is a test file to download from Jetty webserver") -// } + // "Jetty Web server" should "start HTTP server, serve content and stop successfully" in { + // + // var data = "" + // try { + // HttpServer.start("8000", resources) + // val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt") + // data = html.mkString + // } + // finally { + // HttpServer.stop() + // } + // data should equal("This is a test file to download from Jetty webserver") + // } "Jetty File server with '/' as root" should "start HTTP server, serve content and stop successfully" in { - var data = "" - val resources = new File(getClass.getResource("/dist").getPath).getParent - var urlCount:Int = 0 - println("resource location"+resources) + + var urlCount: Int = 0 + println("resource location" + resources) try { - HttpServer.start("8000",resources) - val urls = HttpServer.getFilesInDirectory("localhost","8000","dist") + HttpServer.start("8000", resources) + val urls = HttpServer.getFilesInDirectory("127.0.0.1", "8000", "dist") urls.foreach(println) urlCount = urls.length + } catch { + case e: Exception => println(s"++++>> ${e.getMessage}") } finally { HttpServer.stop() } urlCount should equal(2) } + "Jetty File server with 'dist' as root" should "start HTTP server, serve content and stop successfully" in { var data = "" - val resources = new File(getClass.getResource("/dist").getPath).getParent - var urlCount:Int = 0 - println("resource location"+resources) + var urlCount: Int = 0 + println("resource location" + resources) try { - HttpServer.start("8000",resources+"/dist") - val urls = HttpServer.getFilesInDirectory("localhost","8000","") + HttpServer.start("8000", resources + "/dist") + val urls = HttpServer.getFilesInDirectory("localhost", "8000", "") urls.foreach(println) urlCount = urls.length } diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java index 01fe266..1c6ed8c 100644 --- a/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/RunnersProvider.java @@ -16,6 +16,7 @@ */ package org.apache.amaterasu.sdk; +import org.apache.amaterasu.common.configuration.ClusterConfig; import org.apache.amaterasu.common.dataobjects.ExecData; import org.apache.amaterasu.common.execution.actions.Notifier; @@ -33,7 +34,7 @@ void init(ExecData data, ByteArrayOutputStream outStream, Notifier notifier, String executorId, - String propFile, + ClusterConfig config, String hostName); String getGroupIdentifier(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services