[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535683#comment-16535683 ] ASF GitHub Bot commented on AMATERASU-24: - roadan commented on issue #30: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro… URL: https://github.com/apache/incubator-amaterasu/pull/30#issuecomment-403198711 Tested on mesos and yarn This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor Spark out of Amaterasu executor to it's own project > > > Key: AMATERASU-24 > URL: https://issues.apache.org/jira/browse/AMATERASU-24 > Project: AMATERASU > Issue Type: Improvement >Reporter: Yaniv Rodenski >Assignee: Arun Manivannan >Priority: Major > Fix For: 0.2.1-incubating > > > The Spark framework is a part of the Amaterasu executor and leader, it needs > to be under it own project under a new frameworks folder -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535684#comment-16535684 ] ASF GitHub Bot commented on AMATERASU-24: - roadan closed pull request #30: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro… URL: https://github.com/apache/incubator-amaterasu/pull/30 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.gitignore b/.gitignore index fabb847..dbb51cb 100755 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,5 @@ bin/ amaterasu-executor/ project/project/ executor/spark-warehouse/ +repo +repo/** diff --git a/build.gradle b/build.gradle index 0f11347..00e44ea 100644 --- a/build.gradle +++ b/build.gradle @@ -25,10 +25,6 @@ allprojects { version '0.2.0-incubating-rc4' } -project(':leader') -project(':common') -project(':executor') - task copyLeagalFiles(type: Copy) { from "./DISCLAIMER", "./LICENSE", "./NOTICE" into "${buildDir}/amaterasu" diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala index 8a44019..fe69260 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala @@ -16,9 +16,9 @@ */ package org.apache.amaterasu.common.execution.actions -import NotificationLevel.NotificationLevel -import NotificationType.NotificationType import com.fasterxml.jackson.annotation.JsonProperty +import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel +import org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType abstract class Notifier { diff --git a/executor/build.gradle b/executor/build.gradle index 21bc2b0..09e269c 100644 --- a/executor/build.gradle +++ b/executor/build.gradle @@ -54,7 +54,6 @@ dependencies { compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8' compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8' -compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8' compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final' compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5' @@ -75,18 +74,7 @@ dependencies { compile project(':common') compile project(':amaterasu-sdk') -//runtime dependency for spark -provided('org.apache.spark:spark-repl_2.11:2.2.1') -provided('org.apache.spark:spark-core_2.11:2.2.1') - -testCompile project(':common') -testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" -testRuntime 'org.pegdown:pegdown:1.1.0' -testCompile 'junit:junit:4.11' -testCompile 'org.scalatest:scalatest_2.11:3.0.2' -testCompile 'org.scala-lang:scala-library:2.11.8' -testCompile('org.apache.spark:spark-repl_2.11:2.2.1') -testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9' + } diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala index 9ab75be..90c2001 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala @@ -26,7 +26,6 @@ import org.apache.amaterasu.executor.common.executors.ProvidersFactory import org.apache.mesos.Protos._ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver} -import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global @@ -37,7 +36,6 @@ class MesosActionsExecutor extends Executor with Logging { var master: String = _ var executorDriver: ExecutorDriver = _ - var sc: SparkContext = _ var jobId: String = _ var actionName: String = _ // var sparkScalaRunner: SparkScalaRunner = _ @@ -83,7 +81,7 @@ class MesosActionsExecutor extends Executor with Logging { notifier = new MesosNotifier(driver) notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered") val outStream = new ByteArrayOutputStream() -providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, propFile = "./amaterasu.properties") +providersFactory =
[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16529002#comment-16529002 ] ASF GitHub Bot commented on AMATERASU-24: - arunma opened a new pull request #30: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro… URL: https://github.com/apache/incubator-amaterasu/pull/30 …ject (Master) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor Spark out of Amaterasu executor to it's own project > > > Key: AMATERASU-24 > URL: https://issues.apache.org/jira/browse/AMATERASU-24 > Project: AMATERASU > Issue Type: Improvement >Reporter: Yaniv Rodenski >Assignee: Arun Manivannan >Priority: Major > Fix For: 0.2.1-incubating > > > The Spark framework is a part of the Amaterasu executor and leader, it needs > to be under it own project under a new frameworks folder -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528932#comment-16528932 ] ASF GitHub Bot commented on AMATERASU-24: - roadan closed pull request #27: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own project URL: https://github.com/apache/incubator-amaterasu/pull/27 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index 0f11347..00e44ea 100644 --- a/build.gradle +++ b/build.gradle @@ -25,10 +25,6 @@ allprojects { version '0.2.0-incubating-rc4' } -project(':leader') -project(':common') -project(':executor') - task copyLeagalFiles(type: Copy) { from "./DISCLAIMER", "./LICENSE", "./NOTICE" into "${buildDir}/amaterasu" diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala index 8a44019..fe69260 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala @@ -16,9 +16,9 @@ */ package org.apache.amaterasu.common.execution.actions -import NotificationLevel.NotificationLevel -import NotificationType.NotificationType import com.fasterxml.jackson.annotation.JsonProperty +import org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel +import org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType abstract class Notifier { diff --git a/executor/build.gradle b/executor/build.gradle index 21bc2b0..09e269c 100644 --- a/executor/build.gradle +++ b/executor/build.gradle @@ -54,7 +54,6 @@ dependencies { compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8' compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8' -compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8' compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final' compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5' compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5' @@ -75,18 +74,7 @@ dependencies { compile project(':common') compile project(':amaterasu-sdk') -//runtime dependency for spark -provided('org.apache.spark:spark-repl_2.11:2.2.1') -provided('org.apache.spark:spark-core_2.11:2.2.1') - -testCompile project(':common') -testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14" -testRuntime 'org.pegdown:pegdown:1.1.0' -testCompile 'junit:junit:4.11' -testCompile 'org.scalatest:scalatest_2.11:3.0.2' -testCompile 'org.scala-lang:scala-library:2.11.8' -testCompile('org.apache.spark:spark-repl_2.11:2.2.1') -testCompile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9' + } diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala index 9ab75be..90c2001 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala @@ -26,7 +26,6 @@ import org.apache.amaterasu.executor.common.executors.ProvidersFactory import org.apache.mesos.Protos._ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver} -import org.apache.spark.SparkContext import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global @@ -37,7 +36,6 @@ class MesosActionsExecutor extends Executor with Logging { var master: String = _ var executorDriver: ExecutorDriver = _ - var sc: SparkContext = _ var jobId: String = _ var actionName: String = _ // var sparkScalaRunner: SparkScalaRunner = _ @@ -83,7 +81,7 @@ class MesosActionsExecutor extends Executor with Logging { notifier = new MesosNotifier(driver) notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered") val outStream = new ByteArrayOutputStream() -providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, propFile = "./amaterasu.properties") +providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, "./amaterasu.properties") } diff --git
[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528931#comment-16528931 ] ASF GitHub Bot commented on AMATERASU-24: - roadan commented on issue #27: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own project URL: https://github.com/apache/incubator-amaterasu/pull/27#issuecomment-401573891 Tested on YARN and Mesos This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor Spark out of Amaterasu executor to it's own project > > > Key: AMATERASU-24 > URL: https://issues.apache.org/jira/browse/AMATERASU-24 > Project: AMATERASU > Issue Type: Improvement >Reporter: Yaniv Rodenski >Assignee: Arun Manivannan >Priority: Major > Fix For: 0.2.1-incubating > > > The Spark framework is a part of the Amaterasu executor and leader, it needs > to be under it own project under a new frameworks folder -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AMATERASU-28) Pull Miniconda version away from compiled code
[ https://issues.apache.org/jira/browse/AMATERASU-28?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525900#comment-16525900 ] ASF GitHub Bot commented on AMATERASU-28: - roadan closed pull request #21: AMATERASU-28 Miniconda version pulling away from code URL: https://github.com/apache/incubator-amaterasu/pull/21 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala index 94b8056..5897e1d 100755 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala @@ -175,7 +175,7 @@ object PySparkRunner { * Installs Anaconda and then links it with the local spark that was installed on the executor. */ private def installAnacondaOnNode(): Unit = { -Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") +Seq("bash", "-c", "sh miniconda-install.sh -b -p $PWD/miniconda") Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") } diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index ff56d8c..3d33e8e 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -67,11 +67,6 @@ class SparkRunnersProvider extends RunnersProvider with Logging { jars ++= getDependencies(execData.deps) } -if (execData.pyDeps != null && - execData.pyDeps.packages.nonEmpty) { - loadPythonDependencies(execData.pyDeps, notifier) -} - conf = execData.configurations.get("spark") executorEnv = execData.configurations.get("spark_exec_env") val sparkAppName = s"job_${jobId}_executor_$executorId" @@ -85,56 +80,21 @@ class SparkRunnersProvider extends RunnersProvider with Logging { runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner) // TODO: get rid of hard-coded version -lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config) +lazy val pySparkRunner = + PySparkRunner( +execData.env, +jobId, +notifier, +spark, +s"${config.spark.home}/python:${config.spark.home}/python/pyspark", +execData.pyDeps, +config) runners.put(pySparkRunner.getIdentifier, pySparkRunner) lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark) runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner) } - private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = { -val channel = pythonPackage.channel.getOrElse("anaconda") -if (channel == "anaconda") { - Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger -} else { - Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger -} - } - - private def installAnacondaOnNode(): Unit = { -// TODO: get rid of hard-coded version -Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger -Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger -Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger - } - - private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = { -notifier.info("loading anaconda evn") -installAnacondaOnNode() -val codegenPackage = PythonPackage("codegen", channel = Option("auto")) -installAnacondaPackage(codegenPackage) -try { - // notifier.info("loadPythonDependencies #5") - deps.packages.foreach(pack => { -pack.index.getOrElse("anaconda").toLowerCase match { - case
[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506816#comment-16506816 ] ASF GitHub Bot commented on AMATERASU-24: - arunma commented on issue #24: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro… URL: https://github.com/apache/incubator-amaterasu/pull/24#issuecomment-395939568 @roadan @eyalbenivri @nadav-har-tzvi The kicking off the executor needs some work but appreciate if you could have a quick look and let me know if the changes are on the right track. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor Spark out of Amaterasu executor to it's own project > > > Key: AMATERASU-24 > URL: https://issues.apache.org/jira/browse/AMATERASU-24 > Project: AMATERASU > Issue Type: Improvement >Reporter: Yaniv Rodenski >Assignee: Arun Manivannan >Priority: Major > Fix For: 0.2.1-incubating > > > The Spark framework is a part of the Amaterasu executor and leader, it needs > to be under it own project under a new frameworks folder -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506815#comment-16506815 ] ASF GitHub Bot commented on AMATERASU-24: - arunma opened a new pull request #24: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro… URL: https://github.com/apache/incubator-amaterasu/pull/24 …ject This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor Spark out of Amaterasu executor to it's own project > > > Key: AMATERASU-24 > URL: https://issues.apache.org/jira/browse/AMATERASU-24 > Project: AMATERASU > Issue Type: Improvement >Reporter: Yaniv Rodenski >Assignee: Arun Manivannan >Priority: Major > Fix For: 0.2.1-incubating > > > The Spark framework is a part of the Amaterasu executor and leader, it needs > to be under it own project under a new frameworks folder -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AMATERASU-24) Refactor Spark out of Amaterasu executor to it's own project
[ https://issues.apache.org/jira/browse/AMATERASU-24?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491973#comment-16491973 ] ASF GitHub Bot commented on AMATERASU-24: - arunma opened a new pull request #22: AMATERASU-24 Refactor Spark out of Amaterasu executor to it's own pro… URL: https://github.com/apache/incubator-amaterasu/pull/22 …ject This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor Spark out of Amaterasu executor to it's own project > > > Key: AMATERASU-24 > URL: https://issues.apache.org/jira/browse/AMATERASU-24 > Project: AMATERASU > Issue Type: Improvement >Reporter: Yaniv Rodenski >Priority: Major > Fix For: 0.2.1-incubating > > > The Spark framework is a part of the Amaterasu executor and leader, it needs > to be under it own project under a new frameworks folder -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AMATERASU-21) Fix Spark scala tests
[ https://issues.apache.org/jira/browse/AMATERASU-21?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491564#comment-16491564 ] ASF GitHub Bot commented on AMATERASU-21: - eyalbenivri closed pull request #19: AMATERASU-21 Fix Spark scala tests URL: https://github.com/apache/incubator-amaterasu/pull/19 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/executor/src/test/resources/simple-spark.scala b/executor/src/test/resources/simple-spark.scala index 797235d..34798e7 100755 --- a/executor/src/test/resources/simple-spark.scala +++ b/executor/src/test/resources/simple-spark.scala @@ -1,11 +1,13 @@ + import org.apache.amaterasu.executor.runtime.AmaContext -import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +val data = Seq(1,3,4,5,6) -val data = Array(1, 2, 3, 4, 5) val sc = AmaContext.sc val rdd = sc.parallelize(data) -val sqlContext = AmaContext.sqlContext +val sqlContext = AmaContext.spark import sqlContext.implicits._ val x: DataFrame = rdd.toDF() diff --git a/executor/src/test/resources/step-2.scala b/executor/src/test/resources/step-2.scala index 34ad839..189701f 100755 --- a/executor/src/test/resources/step-2.scala +++ b/executor/src/test/resources/step-2.scala @@ -1,7 +1,5 @@ import org.apache.amaterasu.executor.runtime.AmaContext -val oddRdd = AmaContext.getRDD[Int]("start", "rdd").filter(x=>x/2 == 0) -oddRdd.take(5).foreach(println) -val highNoDf = AmaContext.getDataFrame("start", "x").where("_1 > 3") +val highNoDf = AmaContext.getDataFrame("start", "x").where("age > 20") highNoDf.show diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-0-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-0-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet new file mode 100644 index 000..e1b0d2e Binary files /dev/null and b/executor/src/test/resources/tmp/job/start/x/part-r-0-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ diff --git a/executor/src/test/resources/tmp/job/start/x/part-r-1-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet b/executor/src/test/resources/tmp/job/start/x/part-r-1-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet new file mode 100644 index 000..d807ba9 Binary files /dev/null and b/executor/src/test/resources/tmp/job/start/x/part-r-1-c370548d-1371-4c08-81d6-f2ba72defa12.gz.parquet differ diff --git a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala index d41feea..68c06ce 100755 --- a/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala +++ b/executor/src/test/scala/org/apache/amaterasu/spark/SparkScalaRunnerTests.scala @@ -1,54 +1,56 @@ -//package org.apache.amaterasu.spark -// -//import java.io.File -// -//import org.apache.amaterasu.common.runtime._ -//import org.apache.amaterasu.common.configuration.ClusterConfig -//import org.apache.amaterasu.utilities.TestNotifier -// -//import scala.collection.JavaConverters._ -//import org.apache.commons.io.FileUtils -//import java.io.ByteArrayOutputStream -// -//import org.apache.spark.SparkConf -//import org.apache.spark.repl.Main -//import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner} -//import org.apache.spark.sql.SparkSession -//import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} -// -//class SparkScalaRunnerTests extends FlatSpec with Matchers with BeforeAndAfterAll { -// -// var runner: SparkScalaRunner = _ -// -// override protected def beforeAll(): Unit = { -// -//FileUtils.deleteQuietly(new File("/tmp/job_5/")) -// -//val env = Environment() -//env.workingDir = "file:///tmp" -//env.master = "local[*]" -// -// -//val spark = SparkRunnerHelper.createSpark(env, "job_5", Seq.empty[String], Map.empty) -// -// -//val notifier = new TestNotifier() -//val strm = new ByteArrayOutputStream() -//runner = SparkScalaRunner(env, "job_5", spark, strm, notifier, Seq.empty[String]) -//super.beforeAll() -// } -// -// "SparkScalaRunner" should "execute the simple-spark.scala" in { -// -//val script = getClass.getResource("/simple-spark.scala").getPath -//runner.executeSource(script, "start", Map.empty[String, String].asJava) -// -// } -// -// "SparkScalaRunner" should "execute step-2.scala and access data from simple-spark.scala" in { -// -//val script = getClass.getResource("/step-2.scala").getPath -//runner.executeSource(script, "cont", Map.empty[String, String].asJava) -// -// } -//} \ No newline at end of file +/*
[jira] [Commented] (AMATERASU-26) Pipeline tasks (sub-Yarn jobs) runs as "yarn" user instead of inhering the user in which the amaterasu job was submitted
[ https://issues.apache.org/jira/browse/AMATERASU-26?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16491558#comment-16491558 ] ASF GitHub Bot commented on AMATERASU-26: - eyalbenivri closed pull request #18: AMATERASU-26 Pipeline tasks runs as "yarn" user instead of inheriting… URL: https://github.com/apache/incubator-amaterasu/pull/18 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java index dc4f15e..e3c2812 100644 --- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java +++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -110,8 +111,9 @@ private void run(JobOpts opts, String[] args) throws Exception { List commands = Collections.singletonList( -"env AMA_NODE=" + System.getenv("AMA_NODE") + " " + -"$JAVA_HOME/bin/java" + +"env AMA_NODE=" + System.getenv("AMA_NODE") + +" env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() + +" $JAVA_HOME/bin/java" + " -Dscala.usejavacp=false" + " -Xmx1G" + " org.apache.amaterasu.leader.yarn.ApplicationMaster " + diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala index a44202a..1828100 100644 --- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala +++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala @@ -21,8 +21,8 @@ import java.net.{InetAddress, ServerSocket, URLEncoder} import java.nio.ByteBuffer import java.util import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} -import javax.jms.Session +import javax.jms.Session import org.apache.activemq.ActiveMQConnectionFactory import org.apache.activemq.broker.BrokerService import org.apache.amaterasu.common.configuration.ClusterConfig @@ -153,21 +153,14 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { // TODO: awsEnv currently set to empty string. should be changed to read values from (where?). allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar) -rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this) -rmClient.init(conf) -rmClient.start() - -// Register with ResourceManager -log.info("Registering application") -val registrationResponse = rmClient.registerApplicationMaster("", 0, "") -log.info("Registered application") +rmClient = startRMClient() +val registrationResponse = registerAppMaster("", 0, "") val maxMem = registrationResponse.getMaximumResourceCapability.getMemory log.info("Max mem capability of resources in this cluster " + maxMem) val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores log.info("Max vcores capability of resources in this cluster " + maxVCores) log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.registeredActions.size}") - // Resource requirements for worker containers this.capability = Records.newRecord(classOf[Resource]) val frameworkFactory = FrameworkProvidersFactory.apply(env, config) @@ -194,6 +187,21 @@ class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging { log.info("Finished asking for containers") } + private def startRMClient(): AMRMClientAsync[ContainerRequest] = { +val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this) +client.init(conf) +client.start() +client + } + + private def registerAppMaster(host: String, port: Int, url: String) = { +// Register with ResourceManager +log.info("Registering application") +val registrationResponse = rmClient.registerApplicationMaster(host, port, url) +log.info("Registered application") +registrationResponse + } + private def setupMessaging(jobId: String): Unit = { val cf = new ActiveMQConnectionFactory(address) @@ -225,20 +233,6 @@ class ApplicationMaster extends
[jira] [Commented] (AMATERASU-26) Pipeline tasks (sub-Yarn jobs) runs as "yarn" user instead of inhering the user in which the amaterasu job was submitted
[ https://issues.apache.org/jira/browse/AMATERASU-26?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480622#comment-16480622 ] ASF GitHub Bot commented on AMATERASU-26: - arunma opened a new pull request #18: AMATERASU-26 Pipeline tasks runs as "yarn" user instead of inheriting… URL: https://github.com/apache/incubator-amaterasu/pull/18 … the user This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pipeline tasks (sub-Yarn jobs) runs as "yarn" user instead of inhering the > user in which the amaterasu job was submitted > > > Key: AMATERASU-26 > URL: https://issues.apache.org/jira/browse/AMATERASU-26 > Project: AMATERASU > Issue Type: Improvement >Reporter: Arun Manivannan >Assignee: Arun Manivannan >Priority: Major > Fix For: 0.2.1-incubating > > Attachments: TaskJobsRunAsYarnUser.png > > > Referring to the screenshot, the original user with which the amaterasu job > was submitted was username "amaterasu". However, the sub jobs of the > pipeline gets submitted with the default user "yarn". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AMATERASU-18) Containers are not influenced by framework configuration
[ https://issues.apache.org/jira/browse/AMATERASU-18?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16415057#comment-16415057 ] ASF GitHub Bot commented on AMATERASU-18: - roadan closed pull request #13: AMATERASU-18 URL: https://github.com/apache/incubator-amaterasu/pull/13 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.gitignore b/.gitignore index 414787f..a64ebd0 100755 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,6 @@ bin/ #General .DS_Store -/.gitignore amaterasu-executor/ project/project/ +executor/spark-warehouse/ diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala index 35a6339..7c9f924 100755 --- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala @@ -83,8 +83,8 @@ class ClusterConfig extends Logging { var memoryMB: Int = 1024 def load(props: Properties): Unit = { -if (props.containsKey("yarn.worker.cores")) Master.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int] -if (props.containsKey("yarn.worker.memoryMB")) Master.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int] +if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int] +if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int] } } diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala index 9df79f6..a745581 100644 --- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala +++ b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala @@ -18,4 +18,8 @@ package org.apache.amaterasu.common.dataobjects import org.apache.amaterasu.common.runtime.Environment + +/* TODO: Future eyal and yaniv - The TaskData class should support overriding configurations for execData configurations +// more specifiably, if execData holds configurations for spark setup (vcores/memory) a task should be able to override those +*/ case class TaskData(src: String, env: Environment, groupId: String, typeId: String, exports: Map[String, String]) diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala index d1c33bb..ff56d8c 100644 --- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala +++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala @@ -48,7 +48,7 @@ class SparkRunnersProvider extends RunnersProvider with Logging { private var conf: Option[Map[String, Any]] = _ private var executorEnv: Option[Map[String, Any]] = _ - override def init(data: ExecData, + override def init(execData: ExecData, jobId: String, outStream: ByteArrayOutputStream, notifier: Notifier, @@ -63,32 +63,32 @@ class SparkRunnersProvider extends RunnersProvider with Logging { var jars = Seq.empty[String] -if (data.deps != null) { - jars ++= getDependencies(data.deps) +if (execData.deps != null) { + jars ++= getDependencies(execData.deps) } -if (data.pyDeps != null && -data.pyDeps.packages.nonEmpty) { - loadPythonDependencies(data.pyDeps, notifier) +if (execData.pyDeps != null && + execData.pyDeps.packages.nonEmpty) { + loadPythonDependencies(execData.pyDeps, notifier) } -conf = data.configurations.get("spark") -executorEnv = data.configurations.get("spark_exec_env") +conf = execData.configurations.get("spark") +executorEnv = execData.configurations.get("spark_exec_env") val sparkAppName = s"job_${jobId}_executor_$executorId" SparkRunnerHelper.notifier = notifier -val spark = SparkRunnerHelper.createSpark(data.env, sparkAppName, jars, conf, executorEnv, config, hostName) +val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName) -lazy val sparkScalaRunner = SparkScalaRunner(data.env, jobId, spark, outStream, notifier, jars) -sparkScalaRunner.initializeAmaContext(data.env) +lazy val
[jira] [Commented] (AMATERASU-18) Containers are not influenced by framework configuration
[ https://issues.apache.org/jira/browse/AMATERASU-18?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16376087#comment-16376087 ] ASF GitHub Bot commented on AMATERASU-18: - eyalbenivri opened a new pull request #13: AMATERASU-18 URL: https://github.com/apache/incubator-amaterasu/pull/13 added logic to read ama-workers / spark drivers(am) from configuration. Spark is first, second amaterasu configs, third is defaults. check against maximum cluster resources is also in place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Containers are not influenced by framework configuration > > > Key: AMATERASU-18 > URL: https://issues.apache.org/jira/browse/AMATERASU-18 > Project: AMATERASU > Issue Type: Bug >Reporter: Yaniv Rodenski >Assignee: Eyal Ben Ivri >Priority: Major > Fix For: 0.2.0-incubating > > > Currently, the containers sizing (YARN and Mesos) is derived from the default > configuration and not from the configuration of the framework. We need to > connect the spark properties to the container setup -- This message was sent by Atlassian JIRA (v7.6.3#76005)