roadan closed pull request #13: AMATERASU-18 URL: https://github.com/apache/incubator-amaterasu/pull/13
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.gitignore b/.gitignore index 414787f..a64ebd0 100755 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,6 @@ bin/ #General .DS_Store -/.gitignore amaterasu-executor/ project/project/ +executor/spark-warehouse/ diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala index 35a6339..7c9f924 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala @@ -83,8 +83,8 @@ class ClusterConfig extends Logging { var memoryMB: Int = 1024 def load(props: Properties): Unit = { - if (props.containsKey("yarn.worker.cores")) Master.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int] - if (props.containsKey("yarn.worker.memoryMB")) Master.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int] + if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int] + if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int] } } diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala index 9df79f6..a745581 100644 --- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala @@ -18,4 +18,8 @@ package org.apache.amaterasu.common.dataobjects import org.apache.amaterasu.common.runtime.Environment + +/* TODO: Future eyal and yaniv - The TaskData class should support overriding configurations for execData configurations +// more specifiably, if execData holds configurations for spark setup (vcores/memory) a task should be able to override those +*/ case class TaskData(src: String, env: Environment, groupId: String, typeId: String, exports: Map[String, String]) diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index d1c33bb..ff56d8c 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -48,7 +48,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { private var conf: Option[Map[String, Any]] = _ private var executorEnv: Option[Map[String, Any]] = _ - override def init(data: ExecData, + override def init(execData: ExecData, jobId: String, outStream: ByteArrayOutputStream, notifier: Notifier, @@ -63,32 +63,32 @@ class SparkRunnersProvider extends RunnersProvider with Logging { var jars = Seq.empty[String] - if (data.deps != null) { - jars ++= getDependencies(data.deps) + if (execData.deps != null) { + jars ++= getDependencies(execData.deps) } - if (data.pyDeps != null && - data.pyDeps.packages.nonEmpty) { - loadPythonDependencies(data.pyDeps, notifier) + if (execData.pyDeps != null && + execData.pyDeps.packages.nonEmpty) { + loadPythonDependencies(execData.pyDeps, notifier) } - conf = data.configurations.get("spark") - executorEnv = data.configurations.get("spark_exec_env") + conf = execData.configurations.get("spark") + executorEnv = execData.configurations.get("spark_exec_env") val sparkAppName = s"job_${jobId}_executor_$executorId" SparkRunnerHelper.notifier = notifier - val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, config, hostName) + val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName) - lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, outStream, notifier, jars) - sparkScalaRunner.initializeAmaContext(data.env) + lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars) + sparkScalaRunner.initializeAmaContext(execData.env) runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner) // TODO: get rid of hard-coded version - lazy val pySparkRunner = PySparkRunner(data.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", data.pyDeps, config) + lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config) runners.put(pySparkRunner.getIdentifier, pySparkRunner) - lazy val sparkSqlRunner = SparkSqlRunner(data.env, jobId, notifier, spark) + lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark) runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner) } diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java index 5a8665c..731efb8 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java @@ -18,16 +18,17 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.amaterasu.common.configuration.ClusterConfig; - import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory; import org.apache.amaterasu.leader.utilities.ActiveReportListener; -import org.apache.amaterasu.sdk.FrameworkSetupProvider; - +import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.barriers.DistributedBarrier; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -37,12 +38,6 @@ import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.barriers.DistributedBarrier; -import org.apache.curator.retry.ExponentialBackoffRetry; - import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,7 +137,7 @@ private void run(JobOpts opts, String[] args) throws Exception { } // setup frameworks - FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(config); + FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config); for (String group : frameworkFactory.groups()) { FrameworkSetupProvider framework = frameworkFactory.getFramework(group); diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala index 67d07a8..adaeae9 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/execution/frameworks/FrameworkProvidersFactory.scala @@ -1,10 +1,8 @@ package org.apache.amaterasu.leader.execution.frameworks -import java.net.{URL, URLClassLoader} - import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.logging.Logging -import org.apache.amaterasu.sdk.FrameworkSetupProvider +import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider import org.reflections.Reflections import scala.collection.JavaConversions._ @@ -23,7 +21,7 @@ class FrameworkProvidersFactory { object FrameworkProvidersFactory extends Logging { - def apply(config: ClusterConfig): FrameworkProvidersFactory = { + def apply(env: String, config: ClusterConfig): FrameworkProvidersFactory = { val result = new FrameworkProvidersFactory() @@ -34,7 +32,7 @@ object FrameworkProvidersFactory extends Logging { val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[FrameworkSetupProvider] - provider.init(config) + provider.init(env, config) log.info(s"a provider for group ${provider.getGroupIdentifier} was created") (provider.getGroupIdentifier, provider) diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala index 8515102..0fe378a 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/frameworks/spark/SparkSetupProvider.scala @@ -3,18 +3,31 @@ package org.apache.amaterasu.leader.frameworks.spark import java.io.File import org.apache.amaterasu.common.configuration.ClusterConfig -import org.apache.amaterasu.common.utils.FileUtils -import org.apache.amaterasu.sdk.FrameworkSetupProvider +import org.apache.amaterasu.common.dataobjects.ExecData +import org.apache.amaterasu.leader.utilities.{DataLoader, MemoryFormatParser} +import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider +import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration import scala.collection.mutable class SparkSetupProvider extends FrameworkSetupProvider { + + private var env: String = _ private var conf: ClusterConfig = _ private val runnersResources = mutable.Map[String,Array[File]]() + private var execData: ExecData = _ + private var sparkExecConfigurations = mutable.Map[String, Any]() - override def init(conf: ClusterConfig): Unit = { + override def init(env: String, conf: ClusterConfig): Unit = { + this.env = env this.conf = conf + this.execData = DataLoader.getExecutorData(env, conf) + val sparkExecConfigurationsurations = execData.configurations.get("spark") + if (sparkExecConfigurationsurations.isEmpty) { + throw new Exception(s"Spark configuration files could not be loaded for the environment ${env}") + } + this.sparkExecConfigurations = sparkExecConfigurations ++ sparkExecConfigurationsurations.get runnersResources += "scala" -> Array.empty[File] runnersResources += "sql" -> Array.empty[File] @@ -32,4 +45,38 @@ class SparkSetupProvider extends FrameworkSetupProvider { runnersResources(runnerId) } + override def getDriverConfiguration: DriverConfiguration = { + var cpu: Int = 0 + if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) { + cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt + } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) { + cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt + } else if (conf.spark.opts.contains("yarn.am.cores")) { + cpu = conf.spark.opts("yarn.am.cores").toInt + } else if (conf.spark.opts.contains("driver.cores")) { + cpu = conf.spark.opts("driver.cores").toInt + } else if (conf.YARN.Worker.cores > 0) { + cpu = conf.YARN.Worker.cores + } else { + cpu = 1 + } + var mem: Int = 0 + if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) { + mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString) + } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) { + mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString) + } else if (conf.spark.opts.contains("yarn.am.memory")) { + mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory")) + } else if (conf.spark.opts.contains("driver.memory")) { + mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory")) + } else if (conf.YARN.Worker.memoryMB > 0) { + mem = conf.YARN.Worker.memoryMB + } else if (conf.taskMem > 0) { + mem = conf.taskMem + } else { + mem = 1024 + } + + new DriverConfiguration(mem, cpu) + } } \ No newline at end of file diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala index ec9935c..86863f5 100755 --- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala @@ -29,6 +29,7 @@ import org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel import org.apache.amaterasu.common.execution.actions.{Notification, NotificationLevel, NotificationType} +import org.apache.amaterasu.leader.execution.frameworks.FrameworkProvidersFactory import org.apache.amaterasu.leader.execution.{JobLoader, JobManager} import org.apache.amaterasu.leader.utilities.{DataLoader, HttpServer} import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} @@ -216,6 +217,10 @@ class JobScheduler extends AmaterasuScheduler { } } + val frameworkFactory = FrameworkProvidersFactory.apply(env, config) + val frameworkProvider = frameworkFactory.providers(actionData.groupId) + val driverConfiguration = frameworkProvider.getDriverConfiguration + val actionTask = TaskInfo .newBuilder .setName(taskId.getValue) @@ -224,8 +229,8 @@ class JobScheduler extends AmaterasuScheduler { .setExecutor(executor) .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env))) - .addResources(createScalarResource("cpus", config.Jobs.Tasks.cpus)) - .addResources(createScalarResource("mem", config.Jobs.Tasks.mem)) + .addResources(createScalarResource("cpus", driverConfiguration.getCPUs)) + .addResources(createScalarResource("mem", driverConfiguration.getMemory)) .addResources(createScalarResource("disk", config.Jobs.repoSize)) .build() diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala new file mode 100644 index 0000000..c083bda --- /dev/null +++ b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/MemoryFormatParser.scala @@ -0,0 +1,18 @@ +package org.apache.amaterasu.leader.utilities + +object MemoryFormatParser { + + def extractMegabytes(input: String): Int = { + var result: Int = 0 + val lower = input.toLowerCase + if (lower.contains("mb")) { + result = lower.replace("mb", "").toInt + } else if (lower.contains("gb") | lower.contains("g")) { + result = lower.replace("g", "").replace("b","").toInt * 1024 + } else { + result = lower.toInt + } + + result + } +} diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index 65efecc..3fed076 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -23,8 +23,8 @@ import java.util import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import javax.jms.Session -import org.apache.activemq.broker.BrokerService import org.apache.activemq.ActiveMQConnectionFactory +import org.apache.activemq.broker.BrokerService import org.apache.amaterasu.common.configuration.ClusterConfig import org.apache.amaterasu.common.dataobjects.ActionData import org.apache.amaterasu.common.logging.Logging @@ -157,7 +157,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { rmClient.init(conf) rmClient.start() - // Register with ResourceManager log.info("Registering application") val registrationResponse = rmClient.registerApplicationMaster("", 0, "") @@ -168,16 +167,26 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { log.info("Max vcores capability of resources in this cluster " + maxVCores) log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.registeredActions.size}") + // Resource requirements for worker containers - // TODO: this should be per task based on the framework config this.capability = Records.newRecord(classOf[Resource]) - this.capability.setMemory(Math.min(config.taskMem, 1024)) - this.capability.setVirtualCores(1) - + val frameworkFactory = FrameworkProvidersFactory.apply(env, config) while (!jobManager.outOfActions) { val actionData = jobManager.getNextActionData if (actionData != null) { + + val frameworkProvider = frameworkFactory.providers(actionData.groupId) + val driverConfiguration = frameworkProvider.getDriverConfiguration + + var mem: Int = driverConfiguration.getMemory + mem = Math.min(mem, maxMem) + this.capability.setMemory(mem) + + var cpu = driverConfiguration.getCPUs + cpu = Math.min(cpu, maxVCores) + this.capability.setVirtualCores(cpu) + askContainer(actionData) } } @@ -200,7 +209,6 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { } - private def askContainer(actionData: ActionData): Unit = { actionsBuffer.add(actionData) @@ -247,8 +255,8 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) val commands: List[String] = List( "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && ", - s"/bin/bash spark/bin/load-spark-env.sh && ", - s"java -cp spark/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " + + s"/bin/bash ${config.spark.home}/bin/load-spark-env.sh && ", + s"java -cp ${config.spark.home}/jars/*:executor.jar:${config.spark.home}/conf/:${config.YARN.hadoopHomeDir}/conf/ " + "-Xmx1G " + "-Dscala.usejavacp=true " + "-Dhdp.version=2.6.1.0-129 " + @@ -274,7 +282,7 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark-version-info.properties"))), "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/dist/spark_intp.py")))) - val frameworkFactory = FrameworkProvidersFactory(config) + val frameworkFactory = FrameworkProvidersFactory(env, config) val framework = frameworkFactory.getFramework(actionData.groupId) //adding the framework and executor resources diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java similarity index 52% rename from sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java rename to sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java index dc31e4f..d1be723 100644 --- a/sdk/src/main/java/org/apache/amaterasu/sdk/FrameworkSetupProvider.java +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.java @@ -1,12 +1,13 @@ -package org.apache.amaterasu.sdk; +package org.apache.amaterasu.sdk.frameworks; import org.apache.amaterasu.common.configuration.ClusterConfig; +import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration; import java.io.File; public interface FrameworkSetupProvider { - void init(ClusterConfig conf); + void init(String env, ClusterConfig conf); String getGroupIdentifier(); @@ -14,4 +15,6 @@ File[] getRunnerResources(String runnerId); + DriverConfiguration getDriverConfiguration(); + } \ No newline at end of file diff --git a/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java new file mode 100644 index 0000000..ff9d7c7 --- /dev/null +++ b/sdk/src/main/java/org/apache/amaterasu/sdk/frameworks/configuration/DriverConfiguration.java @@ -0,0 +1,21 @@ +package org.apache.amaterasu.sdk.frameworks.configuration; + +public class DriverConfiguration { + + + private int memory = 0; + private int cpus = 0; + + public DriverConfiguration(int memory, int cpus) { + this.memory = memory; + this.cpus = cpus; + } + + public int getMemory() { + return memory; + } + + public int getCPUs() { + return cpus; + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services