roadan closed pull request #36: Amaterasu-56: Create a Kotlin logger
implementation
URL: https://github.com/apache/incubator-amaterasu/pull/36
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/build.gradle b/build.gradle
index 06a07e82..c7b08dbe 100644
--- a/build.gradle
+++ b/build.gradle
@@ -14,11 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+buildscript {
+ ext.kotlin_version = '1.3.0'
+
+ repositories {
+ mavenCentral()
+ }
+
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ }
+}
+
plugins {
id "org.nosphere.apache.rat" version "0.3.1"
+ id "org.jetbrains.kotlin.jvm" version "1.3.0"
+ id "distribution"
}
apply plugin: 'distribution'
+apply plugin: 'kotlin'
apply plugin: 'project-report'
htmlDependencyReport {
@@ -63,4 +78,12 @@ rat {
tasks.withType(Test) {
maxParallelForks = 1
+}
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
}
\ No newline at end of file
diff --git a/common/build.gradle b/common/build.gradle
index 5a0a211c..9a456ced 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -16,6 +16,7 @@
*/
plugins {
id 'com.github.johnrengelman.shadow' version '1.2.4'
+ id "org.jetbrains.kotlin.jvm"
id 'scala'
}
@@ -45,6 +46,10 @@ dependencies {
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.9'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations',
version: '2.9.4'
+
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
+ compile "org.jetbrains.kotlin:kotlin-reflect"
+
// currently we have to use this specific mesos version to prevent from
// clashing with spark
compile('org.apache.mesos:mesos:0.22.2:shaded-protobuf') {
@@ -60,9 +65,50 @@ dependencies {
testCompile 'junit:junit:4.11'
testCompile 'org.scalatest:scalatest_2.11:3.0.1'
testCompile 'org.scala-lang:scala-library:2.11.8'
+ testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+ testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+ testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+ // spek requires kotlin-reflect, can be omitted if already in the classpath
+ testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
}
task copyToHome() {
}
+sourceSets {
+ test {
+ resources.srcDirs += [file('src/test/resources')]
+ }
+
+ // this is done so Scala will compile before Kotlin
+ main {
+ kotlin {
+ srcDirs = ['src/main/kotlin']
+ }
+ scala {
+ srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
+ }
+ java {
+ srcDirs = ['src/main/java']
+ }
+ }
+}
+
+compileKotlin{
+ kotlinOptions.jvmTarget = "1.8"
+}
+compileTestKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
+
+compileScala {
+ dependsOn compileJava
+ classpath += files(compileJava.destinationDir) +
files(compileKotlin.destinationDir)
+}
+
+compileJava {
+ dependsOn compileKotlin
+}
+
diff --git
a/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java
b/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java
new file mode 100644
index 00000000..3f3413fa
--- /dev/null
+++ b/common/src/main/java/org/apache/amaterasu/common/logging/Logging.java
@@ -0,0 +1,10 @@
+package org.apache.amaterasu.common.logging;
+
+import org.slf4j.Logger;
+
+/**
+ * Created by Eran Bartenstein (p765790) on 5/11/18.
+ */
+public abstract class Logging extends KLogging {
+ protected Logger log = getLog();
+}
diff --git
a/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
new file mode 100644
index 00000000..24853c53
--- /dev/null
+++
b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/enums/ActionStatus.kt
@@ -0,0 +1,13 @@
+package org.apache.amaterasu.common.configuration.enums
+
+/**
+ * Created by Eran Bartenstein on 21/10/18.
+ */
+enum class ActionStatus (val value: String) {
+ pending("pending"),
+ queued("queued"),
+ started("started"),
+ complete("complete"),
+ failed("failed"),
+ canceled("canceled")
+}
diff --git
a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionData.scala
b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt
old mode 100755
new mode 100644
similarity index 63%
rename from
common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionData.scala
rename to
common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt
index 78528c4d..7e19db2c
---
a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionData.scala
+++
b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ActionData.kt
@@ -16,17 +16,21 @@
*/
package org.apache.amaterasu.common.dataobjects
-import
org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import scala.collection.mutable.ListBuffer
-case class ActionData(var status: ActionStatus,
- name: String,
- src: String,
- groupId: String,
- typeId: String,
- id: String,
- exports: Map[String, String],
- nextActionIds: ListBuffer[String]) {
- var errorActionId: String = _
+
+/*
+ Adding default values just for the sake of Scala
+ */
+data class ActionData(var status: ActionStatus = ActionStatus.pending,
+ var name: String= "",
+ var src: String= "",
+ var groupId: String= "",
+ var typeId: String= "",
+ var id: String= "",
+ var exports: Map<String, String> = mutableMapOf(),
+ var nextActionIds: List<String> = listOf()) {
+ lateinit var errorActionId: String
+
}
diff --git
a/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt
b/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt
new file mode 100644
index 00000000..2b4e4112
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/logging/KLogging.kt
@@ -0,0 +1,10 @@
+package org.apache.amaterasu.common.logging
+
+import org.slf4j.LoggerFactory
+
+/**
+ * Created by Eran Bartenstein on 5/11/18.
+ */
+abstract class KLogging {
+ protected var log = LoggerFactory.getLogger(this.javaClass.name)
+}
diff --git
a/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala
b/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala
deleted file mode 100755
index 4d2afa3a..00000000
---
a/common/src/main/scala/org/apache/amaterasu/common/configuration/enums/ActionStatus.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.configuration.enums
-
-object ActionStatus extends Enumeration {
- type ActionStatus = Value
- val pending = Value("pending")
- val queued = Value("queued")
- val started = Value("started")
- val complete = Value("complete")
- val failed = Value("failed")
- val canceled = Value("canceled")
-}
\ No newline at end of file
diff --git
a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
index 5fa2d749..75be6e75 100644
---
a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
+++
b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ActionDataHelper.scala
@@ -18,6 +18,7 @@ package org.apache.amaterasu.common.dataobjects
import com.google.gson.Gson
+/*
object ActionDataHelper {
private val gson = new Gson
def toJsonString(actionData: ActionData): String = {
@@ -25,6 +26,7 @@ object ActionDataHelper {
}
def fromJsonString(jsonString: String) : ActionData = {
- gson.fromJson[ActionData](jsonString, ActionData.getClass)
+ gson.fromJson[ActionData](jsonString, new ActionData().getClass)
}
-}
\ No newline at end of file
+}
+*/
\ No newline at end of file
diff --git
a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
index fe692606..ee71f854 100755
---
a/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
+++
b/common/src/main/scala/org/apache/amaterasu/common/execution/actions/Notifier.scala
@@ -19,8 +19,9 @@ package org.apache.amaterasu.common.execution.actions
import com.fasterxml.jackson.annotation.JsonProperty
import
org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
import
org.apache.amaterasu.common.execution.actions.NotificationType.NotificationType
+import org.apache.amaterasu.common.logging.Logging
-abstract class Notifier {
+abstract class Notifier extends Logging {
def info(msg: String)
diff --git
a/common/src/main/scala/org/apache/amaterasu/common/logging/Logging.scala
b/common/src/main/scala/org/apache/amaterasu/common/logging/Logging.scala
deleted file mode 100755
index 5c9f6560..00000000
--- a/common/src/main/scala/org/apache/amaterasu/common/logging/Logging.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.logging
-
-import org.slf4j.LoggerFactory
-
-trait Logging {
- protected lazy val log = LoggerFactory.getLogger(getClass.getName)
-}
-
diff --git
a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
index 0c2edf80..90e624b5 100644
---
a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
+++
b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
@@ -25,7 +25,7 @@ import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.amaterasu.common.execution.actions.{Notification,
NotificationLevel, NotificationType, Notifier}
import org.apache.amaterasu.common.logging.Logging
-class ActiveNotifier extends Notifier with Logging {
+class ActiveNotifier extends Notifier {
var producer: MessageProducer = _
var session: Session = _
diff --git
a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
index fff2a81d..2c087935 100755
---
a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++
b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
@@ -32,7 +32,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
-class MesosActionsExecutor extends Executor with Logging {
+class MesosActionsExecutor extends Logging with Executor {
var master: String = _
var executorDriver: ExecutorDriver = _
diff --git
a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
index a091c1b7..fcb453a3 100755
---
a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
+++
b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
@@ -23,7 +23,7 @@ import org.apache.amaterasu.common.logging.Logging
import org.apache.mesos.ExecutorDriver
-class MesosNotifier(driver: ExecutorDriver) extends Notifier with Logging {
+class MesosNotifier(driver: ExecutorDriver) extends Notifier {
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
diff --git
a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index b5f8700c..282de685 100644
---
a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++
b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -61,7 +61,7 @@ class ActionsExecutor extends Logging {
// launched with args:
//s"'${jobManager.jobId}' '${config.master}' '${actionData.name}'
'${URLEncoder.encode(gson.toJson(taskData), "UTF-8")}'
'${URLEncoder.encode(gson.toJson(execData), "UTF-8")}'
'${actionData.id}-${container.getId.getContainerId}'"
-object ActionsExecutorLauncher extends App with Logging {
+object ActionsExecutorLauncher extends Logging with App {
val hostName = InetAddress.getLocalHost.getHostName
diff --git
a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
index 841fe425..831cfc8e 100644
---
a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
+++
b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
@@ -21,7 +21,7 @@ import org.apache.amaterasu.common.logging.Logging
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
-class YarnNotifier(conf: YarnConfiguration) extends Notifier with Logging {
+class YarnNotifier(conf: YarnConfiguration) extends Notifier {
var rpc: YarnRPC = YarnRPC.create(conf)
diff --git
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 7f911420..ce3edb9a 100644
---
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -17,7 +17,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider {
case "mesos" =>
s"env AMA_NODE=${sys.env("AMA_NODE")} env
MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env
SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz
" +
s"java -cp
executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/*
" +
- s"-Dscala.usejavacp=true -Djava.library.path=$libPath
org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId
${conf.master} ${actionData.name}.stripMargin"
+ s"-Dscala.usejavacp=true -Djava.library.path=$libPath
org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId
${conf.master} ${actionData.getName}.stripMargin"
case "yarn" => "/bin/bash ./miniconda.sh -b -p $PWD/miniconda && " +
s"/bin/bash spark/bin/load-spark-env.sh && " +
s"java -cp
spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/
" +
@@ -25,7 +25,7 @@ class PySparkRunnerProvider extends RunnerSetupProvider {
"-Dscala.usejavacp=true " +
"-Dhdp.version=2.6.1.0-129 " +
"org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
- s"'$jobId' '${conf.master}' '${actionData.name}'
'${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}'
'${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}'
'$executorId' '$callbackAddress' " +
+ s"'$jobId' '${conf.master}' '${actionData.getName}'
'${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}'
'${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}'
'$executorId' '$callbackAddress' " +
s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
case _ => ""
diff --git
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
index 8dbc36d6..5a8bf37b 100644
---
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
+++
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
@@ -32,16 +32,16 @@ class SparkScalaRunnerProvider extends RunnerSetupProvider {
override def getCommand(jobId: String, actionData: ActionData, env: String,
executorId: String, callbackAddress: String): String = conf.mode match {
case "mesos" =>
s"env AMA_NODE=${sys.env("AMA_NODE")} env
MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so env
SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${conf.Webserver.Port}/dist/spark-${conf.Webserver.sparkVersion}.tgz
" +
- s"java -cp
executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/*
" +
- s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
- s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor
$jobId ${conf.master} ${actionData.name}".stripMargin
+ s"java -cp
executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.Webserver.sparkVersion}/jars/*
" +
+ s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
+ s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor
$jobId ${conf.master} ${actionData.getName}".stripMargin
case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " +
s"java -cp
spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARN.hadoopHomeDir}/conf/
" +
"-Xmx2G " +
"-Dscala.usejavacp=true " +
"-Dhdp.version=2.6.1.0-129 " +
"org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
- s"'$jobId' '${conf.master}' '${actionData.name}'
'${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}'
'${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}'
'$executorId' '$callbackAddress' " +
+ s"'$jobId' '${conf.master}' '${actionData.getName}'
'${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}'
'${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}'
'$executorId' '$callbackAddress' " +
s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
s"2> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr "
case _ => ""
diff --git
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala
index 525c4f5d..5d566a0b 100644
---
a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala
+++
b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkShellScalaRunnerProvider.scala
@@ -9,13 +9,13 @@ class SparkShellScalaRunnerProvider extends
RunnerSetupProvider {
private var conf: ClusterConfig = _
override def getCommand(jobId: String, actionData: ActionData, env: String,
executorId: String, callbackAddress: String): String =
- s"$$SPARK_HOME/bin/spark-shell ${actionData.src} --jars
spark-runtime-${conf.version}.jar"
+ s"$$SPARK_HOME/bin/spark-shell ${actionData.getSrc} --jars
spark-runtime-${conf.version}.jar"
override def getRunnerResources: Array[String] =
Array[String]()
def getActionResources(jobId: String, actionData: ActionData): Array[String]
=
- Array[String](s"$jobId/${actionData.name}/${actionData.src}")
+ Array[String](s"$jobId/${actionData.getName}/${actionData.getSrc}")
override def getActionDependencies(jobId: String, actionData: ActionData):
Array[String] = Array[String]()
diff --git
a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
index e6c0a7d0..a48aaa06 100644
---
a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
+++
b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.sys.process._
-class SparkRunnersProvider extends RunnersProvider with Logging {
+class SparkRunnersProvider extends Logging with RunnersProvider {
private val runners = new TrieMap[String, AmaterasuRunner]
private var shellLoger = ProcessLogger(
diff --git
a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
index 90f8c680..a60c827d 100644
---
a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
+++
b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
@@ -33,7 +33,7 @@ import scala.sys.process.{Process, ProcessLogger}
-class PySparkRunner extends AmaterasuRunner with Logging {
+class PySparkRunner extends Logging with AmaterasuRunner {
var proc: Process = _
var notifier: Notifier = _
diff --git
a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
index 16cb97bb..430e75ab 100644
---
a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
+++
b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/utilities/TestNotifier.scala
@@ -20,7 +20,7 @@ import org.apache.amaterasu.common.execution.actions.Notifier
import org.apache.amaterasu.common.logging.Logging
-class TestNotifier extends Notifier with Logging {
+class TestNotifier extends Notifier {
override def info(msg: String): Unit = {
log.info(msg)
diff --git a/gradle/wrapper/gradle-wrapper.properties
b/gradle/wrapper/gradle-wrapper.properties
index a95009c3..7dc503f1 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 61b93092..6ca9513b 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -15,7 +15,6 @@
* limitations under the License.
*/
buildscript {
- ext.kotlin_version = '1.2.60'
repositories {
mavenCentral()
@@ -104,12 +103,3 @@ compileKotlin {
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
-//
-//kotlin {
-// experimental {
-// coroutines 'enable'
-// }
-//}
-
-//task copyToHome() {
-//}
\ No newline at end of file
diff --git
a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
new file mode 100644
index 00000000..e4f0e3ec
--- /dev/null
+++
b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/actions/Action.kt
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.actions
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.common.logging.Logging
+import org.apache.curator.framework.CuratorFramework
+
+/**
+ * Created by Eran Bartenstein on 19/10/18.
+ */
+abstract class Action : KLogging() {
+ lateinit var actionPath: String
+ lateinit var actionId: String
+ lateinit var client: CuratorFramework
+ lateinit var data: ActionData
+ abstract fun execute()
+ abstract fun handleFailure(message: String) : String
+
+ fun announceStart() {
+ log.debug("Starting action ${data.name} of group ${data.groupId} and
type ${data.typeId}")
+ client.setData().forPath(actionPath,
ActionStatus.started.value.toByteArray())
+ data.status = ActionStatus.started
+ }
+
+ fun announceQueued() {
+ log.debug("Action ${data.name} of group ${data.groupId} and of type
${data.typeId} is queued for execution")
+ client.setData().forPath(actionPath,
ActionStatus.queued.value.toByteArray())
+ data.status = ActionStatus.queued
+ }
+
+ fun announceComplete() {
+ log.debug("Action ${data.name} of group ${data.groupId} and of type
${data.typeId} complete")
+ client.setData().forPath(actionPath,
ActionStatus.complete.value.toByteArray())
+ data.status = ActionStatus.complete
+ }
+
+ fun announceCanceled() {
+ log.debug("Action ${data.name} of group ${data.groupId} and of type
${data.typeId} was canceled")
+ client.setData().forPath(actionPath,
ActionStatus.canceled.value.toByteArray())
+ data.status = ActionStatus.canceled
+ }
+
+ protected fun announceFailure() {}
+
+}
diff --git
a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
index 7a1bc6c9..e4179055 100755
---
a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
+++
b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.amaterasu.leader.common.utilities
-
+import scala.collection.JavaConverters._
import java.io.{File, FileInputStream}
import java.nio.file.{Files, Paths}
@@ -43,12 +43,15 @@ object DataLoader extends Logging {
ymlMapper.registerModule(DefaultScalaModule)
def getTaskData(actionData: ActionData, env: String): TaskData = {
- val srcFile = actionData.src
+ val srcFile = actionData.getSrc
val src = Source.fromFile(s"repo/src/$srcFile").mkString
val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString
val envData = ymlMapper.readValue(envValue, classOf[Environment])
- TaskData(src, envData, actionData.groupId, actionData.typeId,
actionData.exports)
+
+ val exports = actionData.getExports.asScala.toMap // Kotlin to Scala TODO:
Remove me as fast as you can
+
+ TaskData(src, envData, actionData.getGroupId, actionData.getTypeId,
exports)
}
def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
diff --git a/leader/build.gradle b/leader/build.gradle
index 114bbd32..dc244fc1 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -18,6 +18,7 @@ plugins {
id "com.github.johnrengelman.shadow" version "1.2.4"
id 'com.github.maiflai.scalatest' version '0.22'
id 'scala'
+ id 'org.jetbrains.kotlin.jvm'
id 'java'
}
@@ -62,6 +63,8 @@ dependencies {
compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
compile group: 'org.apache.activemq', name: 'activemq-broker', version:
'5.15.3'
compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
+ compile "org.jetbrains.kotlin:kotlin-reflect"
runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store',
version: '5.15.3'
testCompile project(':common')
@@ -82,8 +85,11 @@ sourceSets {
// this is done so Scala will compile before Java
main {
+ kotlin {
+ srcDirs = ['src/main/kotlin']
+ }
scala {
- srcDirs = ['src/main/scala', 'src/main/java']
+ srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
}
java {
srcDirs = []
@@ -109,4 +115,11 @@ task copyToHomeBin(type: Copy) {
task copyToHome() {
dependsOn copyToHomeRoot
dependsOn copyToHomeBin
-}
\ No newline at end of file
+}
+
+compileKotlin{
+ kotlinOptions.jvmTarget = "1.8"
+}
+compileTestKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala
similarity index 82%
rename from
leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala
rename to
leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala
index ca29f0cb..d9be4dd7 100755
---
a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/SequentialAction.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/common/actions/SequentialAction.scala
@@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.execution.actions
+package org.apache.amaterasu.leader.common.actions
+import java.util
import java.util.concurrent.BlockingQueue
import org.apache.amaterasu.common.configuration.enums.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.actions.Action
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
+import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
class SequentialAction extends Action {
@@ -51,16 +54,16 @@ class SequentialAction extends Action {
override def handleFailure(message: String): String = {
- println(s"Part ${data.name} of group ${data.groupId} and of type
${data.typeId} failed on attempt $attempt with message: $message")
+ println(s"Part ${data.getName} of group ${data.getGroupId} and of type
${data.getTypeId} failed on attempt $attempt with message: $message")
attempt += 1
if (attempt <= attempts) {
- data.id
+ data.getId
}
else {
announceFailure()
println(s"===> moving to err action ${data.errorActionId}")
- data.status = ActionStatus.failed
+ data.setStatus ( ActionStatus.failed )
data.errorActionId
}
@@ -91,7 +94,8 @@ object SequentialAction {
action.attempts = attempts
action.jobId = jobId
- action.data = ActionData(ActionStatus.pending, name, src, groupId, typeId,
action.actionId, exports, new ListBuffer[String])
+ val javaExports = exports.asJava
+ action.data = new ActionData(ActionStatus.pending, name, src, groupId,
typeId, action.actionId, javaExports, new util.ArrayList[String]())
action.jobsQueue = queue
action.client = zkClient
@@ -121,7 +125,7 @@ object ErrorAction {
action.actionId =
action.actionPath.substring(action.actionPath.indexOf('-') + 1).replace("/",
"-")
action.jobId = jobId
- action.data = ActionData(ActionStatus.pending, name, src, groupId, typeId,
action.actionId, Map.empty, new ListBuffer[String])
+ action.data = new ActionData(ActionStatus.pending, name, src, groupId,
typeId, action.actionId, new util.HashMap[String, String](), new
util.ArrayList[String]())
action.jobsQueue = queue
action.client = zkClient
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
index c271f50a..e08489cd 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/dsl/JobParser.scala
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.leader.execution.JobManager
-import org.apache.amaterasu.leader.execution.actions.{Action, ErrorAction,
SequentialAction}
+import org.apache.amaterasu.leader.common.actions.{ErrorAction,
SequentialAction}
+import org.apache.amaterasu.leader.common.execution.actions.Action
import org.apache.curator.framework.CuratorFramework
import scala.collection.JavaConverters._
@@ -103,17 +104,18 @@ object JobParser {
)
//updating the list of frameworks setup
- manager.frameworks.getOrElseUpdate(action.data.groupId,
- new mutable.HashSet[String]())
- .add(action.data.typeId)
+ manager.frameworks.getOrElseUpdate(action.data.getGroupId,
+ new mutable.HashSet[String]())
+ .add(action.data.getTypeId)
- if (manager.head == null)
+ if (manager.head == null) {
manager.head = action
+ }
- if (previous != null)
- previous.data.nextActionIds.append(action.actionId)
-
+ if (previous != null) {
+ previous.data.getNextActionIds.add(action.actionId)
+ }
manager.registerAction(action)
val errorNode = actionData.path("error")
@@ -123,18 +125,18 @@ object JobParser {
val errorAction = parseErrorAction(
errorNode,
manager.jobId,
- action.data.id,
+ action.data.getId,
actionsQueue,
manager.client
)
- action.data.errorActionId = errorAction.data.id
+ action.data.errorActionId = errorAction.data.getId
manager.registerAction(errorAction)
//updating the list of frameworks setup
- manager.frameworks.getOrElseUpdate(errorAction.data.groupId,
+ manager.frameworks.getOrElseUpdate(errorAction.data.getGroupId,
new mutable.HashSet[String]())
- .add(errorAction.data.typeId)
+ .add(errorAction.data.getTypeId)
}
parseActions(actions.tail, manager, actionsQueue, attempts, action)
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
index 38f4b7c5..70642dbf 100755
---
a/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/execution/JobManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.BlockingQueue
import org.apache.amaterasu.common.configuration.enums.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.execution.actions.Action
+import org.apache.amaterasu.leader.common.execution.actions.Action
import org.apache.curator.framework.CuratorFramework
import scala.collection.concurrent.TrieMap
@@ -54,9 +54,9 @@ class JobManager extends Logging {
}
- def outOfActions: Boolean = !registeredActions.values.exists(a =>
a.data.status == ActionStatus.pending ||
- a.data.status == ActionStatus.queued ||
- a.data.status == ActionStatus.started)
+ def outOfActions: Boolean = !registeredActions.values.exists(a =>
a.data.getStatus == ActionStatus.pending ||
+ a.data.getStatus == ActionStatus.queued ||
+ a.data.getStatus == ActionStatus.started)
/**
* getNextActionData returns the data of the next action to be executed if
such action
* exists
@@ -68,7 +68,7 @@ class JobManager extends Logging {
val nextAction: ActionData = executionQueue.poll()
if (nextAction != null) {
- registeredActions(nextAction.id).announceStart
+ registeredActions(nextAction.getId).announceStart
}
nextAction
@@ -102,8 +102,7 @@ class JobManager extends Logging {
val action = registeredActions.get(actionId).get
action.announceComplete
- action.data.nextActionIds.foreach(id =>
- registeredActions.get(id).get.execute())
+ action.data.getNextActionIds.toArray.foreach(id =>
registeredActions.get(id.toString).get.execute())
// we don't need the error action anymore
if (action.data.errorActionId != null)
@@ -131,11 +130,11 @@ class JobManager extends Logging {
def cancelFutureActions(action: Action): Unit = {
- if (action.data.status != ActionStatus.failed)
+ if (action.data.getStatus != ActionStatus.failed)
action.announceCanceled
- action.data.nextActionIds.foreach(id =>
- cancelFutureActions(registeredActions.get(id).get))
+ action.data.getNextActionIds.toArray.foreach(id =>
+ cancelFutureActions(registeredActions.get(id.toString).get))
}
/**
@@ -185,4 +184,4 @@ object JobManager {
}
-}
\ No newline at end of file
+}
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala
deleted file mode 100755
index f5409972..00000000
---
a/leader/src/main/scala/org/apache/amaterasu/leader/execution/actions/Action.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.execution.actions
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.curator.framework.CuratorFramework
-
-trait Action extends Logging {
-
- // this is the znode path for the action
- var actionPath: String = _
- var actionId: String = _
-
- var data: ActionData = _
- var client: CuratorFramework = _
-
- def execute(): Unit
-
- def handleFailure(message: String): String
-
- /**
- * The announceStart register the beginning of the of the task with ZooKeper
- */
- def announceStart: Unit = {
-
- log.debug(s"Starting action ${data.name} of group ${data.groupId} and type
${data.typeId}")
- client.setData().forPath(actionPath,
ActionStatus.started.toString.getBytes)
- data.status = ActionStatus.started
- }
-
- def announceQueued: Unit = {
-
- log.debug(s"Action ${data.name} of group ${data.groupId} and of type
${data.typeId} is queued for execution")
- client.setData().forPath(actionPath, ActionStatus.queued.toString.getBytes)
- data.status = ActionStatus.queued
- }
-
- def announceComplete: Unit = {
-
- log.debug(s"Action ${data.name} of group ${data.groupId} and of type
${data.typeId} completed")
- client.setData().forPath(actionPath,
ActionStatus.complete.toString.getBytes)
- data.status = ActionStatus.complete
- }
-
- def announceCanceled: Unit = {
-
- log.debug(s"Action ${data.name} of group ${data.groupId} and of type
${data.typeId} was canceled")
- client.setData().forPath(actionPath,
ActionStatus.canceled.toString.getBytes)
- data.status = ActionStatus.canceled
- }
- protected def announceFailure(): Unit = {}
-
-}
\ No newline at end of file
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala
index 737f59d9..0f234380 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/Launcher.scala
@@ -24,7 +24,7 @@ import org.apache.amaterasu.leader.Kami
import org.apache.amaterasu.leader.mesos.schedulers.ClusterScheduler
import org.apache.mesos.{MesosSchedulerDriver, Protos}
-object Launcher extends App with Logging {
+object Launcher extends Logging with App {
println(
"""
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
index 0ffdb7a3..2adff07a 100755
---
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
@@ -20,7 +20,7 @@ import org.apache.amaterasu.common.logging.Logging
import org.apache.mesos.Protos._
import org.apache.mesos.{Executor, ExecutorDriver}
-object JobExecutor extends Executor with Logging {
+object JobExecutor extends Logging with Executor {
override def shutdown(driver: ExecutorDriver): Unit = {}
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
index 4b1a74c1..68c8f85a 100755
---
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
@@ -20,7 +20,7 @@ import org.apache.amaterasu.common.logging.Logging
import org.apache.mesos.Protos.{Resource, Value}
import org.apache.mesos.Scheduler
-trait AmaterasuScheduler extends Scheduler with Logging {
+trait AmaterasuScheduler extends Logging with Scheduler {
def createScalarResource(name: String, value: Double): Resource = {
Resource.newBuilder
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index a407a0db..bcd7923d 100755
---
a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -30,7 +30,6 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import
org.apache.amaterasu.common.configuration.enums.ActionStatus.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
import
org.apache.amaterasu.common.execution.actions.NotificationLevel.NotificationLevel
import org.apache.amaterasu.common.execution.actions.{Notification,
NotificationLevel, NotificationType}
@@ -156,19 +155,19 @@ class JobScheduler extends AmaterasuScheduler {
try {
val actionData = jobManager.getNextActionData
if (actionData != null) {
- val taskId =
Protos.TaskID.newBuilder().setValue(actionData.id).build()
+ val taskId =
Protos.TaskID.newBuilder().setValue(actionData.getId).build()
// setting up the configuration files for the container
- val envYaml =
configManager.getActionConfigContent(actionData.name, "") //TODO: replace with
the value in actionData.config
- writeConfigFile(envYaml, jobManager.jobId, actionData.name,
"env.yaml")
+ val envYaml =
configManager.getActionConfigContent(actionData.getName, "") //TODO: replace
with the value in actionData.config
+ writeConfigFile(envYaml, jobManager.jobId, actionData.getName,
"env.yaml")
val dataStores = DataLoader.getTaskData(actionData, env).exports
val writer = new StringWriter()
yamlMapper.writeValue(writer, dataStores)
val dataStoresYaml = writer.toString
- writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name,
"datastores.yaml")
+ writeConfigFile(dataStoresYaml, jobManager.jobId,
actionData.getName, "datastores.yaml")
- writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName:
${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")
+ writeConfigFile(s"jobId: ${jobManager.jobId}\nactionName:
${actionData.getName}", jobManager.jobId, actionData.getName, "runtime.yaml")
offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
@@ -180,8 +179,8 @@ class JobScheduler extends AmaterasuScheduler {
slaveActions.put(taskId.getValue, ActionStatus.started)
- val frameworkProvider =
frameworkFactory.providers(actionData.groupId)
- val runnerProvider =
frameworkProvider.getRunnerProvider(actionData.typeId)
+ val frameworkProvider =
frameworkFactory.providers(actionData.getGroupId)
+ val runnerProvider =
frameworkProvider.getRunnerProvider(actionData.getTypeId)
// searching for an executor that already exist on the slave, if
non exist
// we create a new one
@@ -198,7 +197,7 @@ class JobScheduler extends AmaterasuScheduler {
//creating the command
// TODO: move this into the runner provider somehow
- copy(get(s"repo/src/${actionData.src}"),
get(s"dist/${jobManager.jobId}/${actionData.name}/${actionData.src}"),
REPLACE_EXISTING)
+ copy(get(s"repo/src/${actionData.getSrc}"),
get(s"dist/${jobManager.jobId}/${actionData.getName}/${actionData.getSrc}"),
REPLACE_EXISTING)
println(s"===> ${runnerProvider.getCommand(jobManager.jobId,
actionData, env, executorId, "")}")
val command = CommandInfo
@@ -210,26 +209,26 @@ class JobScheduler extends AmaterasuScheduler {
.setExtract(false)
.build())
- // Getting env.yaml
- command.addUris(URI.newBuilder
-
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/env.yaml")
- .setExecutable(false)
- .setExtract(true)
- .build())
+ // Getting env.yaml
+ command.addUris(URI.newBuilder
+
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/env.yaml")
+ .setExecutable(false)
+ .setExtract(true)
+ .build())
- // Getting datastores.yaml
- command.addUris(URI.newBuilder
-
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/datastores.yaml")
- .setExecutable(false)
- .setExtract(true)
- .build())
+ // Getting datastores.yaml
+ command.addUris(URI.newBuilder
+
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/datastores.yaml")
+ .setExecutable(false)
+ .setExtract(true)
+ .build())
- // Getting runtime.yaml
- command.addUris(URI.newBuilder
-
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.name}/runtime.yaml")
- .setExecutable(false)
- .setExtract(true)
- .build())
+ // Getting runtime.yaml
+ command.addUris(URI.newBuilder
+
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/${jobManager.jobId}/${actionData.getName}/runtime.yaml")
+ .setExecutable(false)
+ .setExtract(true)
+ .build())
// Getting framework resources
frameworkProvider.getGroupResources.foreach(f =>
command.addUris(URI.newBuilder
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
index d1d0c531..38c90c7e 100644
---
a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
@@ -22,7 +22,7 @@ import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.logging.Logging
-abstract class BaseJobLauncher extends App with Logging {
+abstract class BaseJobLauncher extends Logging with App {
def run(args: Args, config: ClusterConfig, resume: Boolean): Unit = ???
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 1bbaa157..23700f80 100644
---
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -53,7 +53,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
-class ApplicationMaster extends AMRMClientAsync.CallbackHandler with Logging {
+class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
var capability: Resource = _
@@ -170,7 +170,7 @@ class ApplicationMaster extends
AMRMClientAsync.CallbackHandler with Logging {
val actionData = jobManager.getNextActionData
if (actionData != null) {
- val frameworkProvider = frameworkFactory.providers(actionData.groupId)
+ val frameworkProvider =
frameworkFactory.providers(actionData.getGroupId)
val driverConfiguration = frameworkProvider.getDriverConfiguration
var mem: Int = driverConfiguration.getMemory
@@ -221,14 +221,14 @@ class ApplicationMaster extends
AMRMClientAsync.CallbackHandler with Logging {
private def askContainer(actionData: ActionData): Unit = {
actionsBuffer.add(actionData)
- log.info(s"About to ask container for action ${actionData.id}. Action
buffer size is: ${actionsBuffer.size()}")
+ log.info(s"About to ask container for action ${actionData.getId}. Action
buffer size is: ${actionsBuffer.size()}")
// we have an action to schedule, let's request a container
val priority: Priority = Records.newRecord(classOf[Priority])
priority.setPriority(1)
val containerReq = new ContainerRequest(capability, null, null, priority)
rmClient.addContainerRequest(containerReq)
- log.info(s"Asked container for action ${actionData.id}")
+ log.info(s"Asked container for action ${actionData.getId}")
}
@@ -245,10 +245,10 @@ class ApplicationMaster extends
AMRMClientAsync.CallbackHandler with Logging {
val containerTask = Future[ActionData] {
val frameworkFactory = FrameworkProvidersFactory(env, config)
- val framework = frameworkFactory.getFramework(actionData.groupId)
- val runnerProvider = framework.getRunnerProvider(actionData.typeId)
+ val framework = frameworkFactory.getFramework(actionData.getGroupId)
+ val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
- val commands: List[String] =
List(runnerProvider.getCommand(jobManager.jobId, actionData, env,
s"${actionData.id}-${container.getId.getContainerId}", address))
+ val commands: List[String] =
List(runnerProvider.getCommand(jobManager.jobId, actionData, env,
s"${actionData.getId}-${container.getId.getContainerId}", address))
log.info("Running container id {}.", container.getId.getContainerId)
log.info("Running container id {} with command '{}'",
container.getId.getContainerId, commands.last)
@@ -283,7 +283,7 @@ class ApplicationMaster extends
AMRMClientAsync.CallbackHandler with Logging {
//adding the framework and executor resources
setupResources(yarnJarPath, framework.getGroupIdentifier, resources,
framework.getGroupIdentifier)
- setupResources(yarnJarPath,
s"${framework.getGroupIdentifier}/${actionData.typeId}", resources,
s"${framework.getGroupIdentifier}-${actionData.typeId}")
+ setupResources(yarnJarPath,
s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources,
s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
ctx.setLocalResources(resources)
@@ -305,9 +305,9 @@ class ApplicationMaster extends
AMRMClientAsync.CallbackHandler with Logging {
askContainer(actionData)
case Success(requestedActionData) =>
- jobManager.actionStarted(requestedActionData.id)
+ jobManager.actionStarted(requestedActionData.getId)
containersIdsToTask.put(container.getId.getContainerId,
requestedActionData)
- log.info(s"launching container succeeded:
${container.getId.getContainerId}; task: ${requestedActionData.id}")
+ log.info(s"launching container succeeded:
${container.getId.getContainerId}; task: ${requestedActionData.getId}")
}
}
@@ -371,15 +371,16 @@ class ApplicationMaster extends
AMRMClientAsync.CallbackHandler with Logging {
val task = containersIdsToTask(containerId)
rmClient.releaseAssignedContainer(status.getContainerId)
+ val taskId = task.getId
if (status.getExitStatus == 0) {
//completedContainersAndTaskIds.put(containerId, task.id)
- jobManager.actionComplete(task.id)
- log.info(s"Container $containerId completed with task ${task.id}
with success.")
+ jobManager.actionComplete(taskId)
+ log.info(s"Container $containerId complete with task ${taskId} with
success.")
} else {
// TODO: Check the getDiagnostics value and see if appropriate
- jobManager.actionFailed(task.id, status.getDiagnostics)
- log.warn(s"Container $containerId completed with task ${task.id}
with failed status code (${status.getExitStatus})")
+ jobManager.actionFailed(taskId, status.getDiagnostics)
+ log.warn(s"Container $containerId complete with task ${taskId} with
failed status code (${status.getExitStatus})")
}
}
}
@@ -456,7 +457,7 @@ class ApplicationMaster extends
AMRMClientAsync.CallbackHandler with Logging {
}
}
-object ApplicationMaster extends App with Logging {
+object ApplicationMaster extends Logging with App {
val parser = Args.getParser
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala
index 14c4f439..23f4af68 100644
---
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.{ContainerId,
ContainerStatus}
import org.apache.hadoop.yarn.client.api.async.NMClientAsync
-class YarnNMCallbackHandler extends NMClientAsync.CallbackHandler with Logging
{
+class YarnNMCallbackHandler extends Logging with NMClientAsync.CallbackHandler
{
override def onStartContainerError(containerId: ContainerId, t: Throwable):
Unit = {
log.error(s"Container ${containerId.getContainerId} couldn't start.", t)
diff --git
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
index 24f28ccc..379dd1b7 100644
---
a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
+++
b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
@@ -41,7 +41,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
env: String,
awsEnv: String,
config: ClusterConfig,
- executorJar: LocalResource) extends
AMRMClientAsync.CallbackHandler with Logging {
+ executorJar: LocalResource) extends Logging with
AMRMClientAsync.CallbackHandler {
val gson:Gson = new Gson()
@@ -67,9 +67,9 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
val taskId = containersIdsToTaskIds(containerId)
if (status.getExitStatus == 0) {
completedContainersAndTaskIds.put(containerId, taskId)
- log.info(s"Container $containerId completed with task $taskId with
success.")
+ log.info(s"Container $containerId complete with task $taskId with
success.")
} else {
- log.warn(s"Container $containerId completed with task $taskId with
failed status code (${status.getExitStatus}.")
+ log.warn(s"Container $containerId complete with task $taskId with
failed status code (${status.getExitStatus}.")
val failedTries = failedTasksCounter.getOrElse(taskId, 0)
if (failedTries < MAX_ATTEMPTS_PER_TASK) {
// TODO: notify and ask for a new container
@@ -108,7 +108,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
| java -cp
executor.jar:spark-${config.Webserver.sparkVersion}/lib/*
| -Dscala.usejavacp=true
| -Djava.library.path=/usr/lib
org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher
- | ${jobManager.jobId} ${config.master}
${actionData.name} ${gson.toJson(taskData)}
${gson.toJson(execData)}""".stripMargin
+ | ${jobManager.jobId} ${config.master}
${actionData.getName} ${gson.toJson(taskData)}
${gson.toJson(execData)}""".stripMargin
ctx.setCommands(Collections.singletonList(command))
ctx.setLocalResources(Map[String, LocalResource] (
@@ -116,7 +116,7 @@ class YarnRMCallbackHandler(nmClient: NMClientAsync,
))
nmClient.startContainerAsync(container, ctx)
- actionData.id
+ actionData.getId
}
containerTask onComplete {
diff --git
a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionTests.scala
b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
similarity index 79%
rename from
leader/src/test/scala/org/apache/amaterasu/common/execution/ActionTests.scala
rename to
leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
index c89fd2a8..197c703d 100755
---
a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionTests.scala
+++
b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
@@ -16,24 +16,27 @@
*/
package org.apache.amaterasu.common.execution
+import java.util
import java.util.concurrent.LinkedBlockingQueue
import org.apache.amaterasu.common.configuration.enums.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.execution.actions.SequentialAction
+import org.apache.amaterasu.leader.common.actions.SequentialAction
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingServer
import org.apache.zookeeper.CreateMode
import org.scalatest.{FlatSpec, Matchers}
-class ActionTests extends FlatSpec with Matchers {
+import scala.collection.JavaConverters._
+
+class ActionStatusTests extends FlatSpec with Matchers {
// setting up a testing zookeeper server (curator TestServer)
val retryPolicy = new ExponentialBackoffRetry(1000, 3)
val server = new TestingServer(2181, true)
val jobId = s"job_${System.currentTimeMillis}"
- val data = ActionData(ActionStatus.pending, "test_action", "start.scala",
"spark","scala", null, Map.empty , null)
+ val data = new ActionData(ActionStatus.pending, "test_action",
"start.scala", "spark","scala", "0000001", new util.HashMap() ,
List[String]().asJava)
"an Action" should "queue it's ActionData int the job queue when executed"
in {
@@ -44,11 +47,11 @@ class ActionTests extends FlatSpec with Matchers {
client.start()
client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
- val action = SequentialAction(data.name, data.src, data.groupId,
data.typeId, Map.empty, jobId, queue, client, 1)
+ val action = SequentialAction(data.getName, data.getSrc, data.getGroupId,
data.getTypeId, Map.empty, jobId, queue, client, 1)
action.execute()
- queue.peek().name should be(data.name)
- queue.peek().src should be(data.src)
+ queue.peek().getName should be(data.getName)
+ queue.peek().getSrc should be(data.getSrc)
}
diff --git
a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
index b15f0bd6..ef47cc17 100755
---
a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
+++
b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
@@ -49,21 +49,21 @@ class JobExecutionTests extends FlatSpec with Matchers {
"a job" should "queue the first action when the JobManager.start method is
called " in {
job.start
- queue.peek.name should be ("start")
+ queue.peek.getName should be ("start")
// making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000000")
+ val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
new String(actionStatus) should be("queued")
}
it should "return the start action when calling getNextAction and dequeue
it" in {
- job.getNextActionData.name should be ("start")
+ job.getNextActionData.getName should be ("start")
queue.size should be (0)
// making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000000")
+ val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
new String(actionStatus) should be("started")
}
@@ -73,17 +73,17 @@ class JobExecutionTests extends FlatSpec with Matchers {
job.actionComplete("0000000000")
// making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000000")
+ val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
new String(actionStatus) should be("complete")
}
"the next step2 job" should "be queued as a result of the completion" in {
- queue.peek.name should be ("step2")
+ queue.peek.getName should be ("step2")
// making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000001")
+ val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
new String(actionStatus) should be("queued")
}
@@ -92,20 +92,20 @@ class JobExecutionTests extends FlatSpec with Matchers {
val data = job.getNextActionData
- data.name should be ("step2")
+ data.getName should be ("step2")
// making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/${jobId}/task-0000000001")
+ val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
new String(actionStatus) should be("started")
}
it should "be marked as failed when JobManager. is called" in {
job.actionFailed("0000000001", "test failure")
- queue.peek.name should be ("error-action")
+ queue.peek.getName should be ("error-action")
// making sure that the status is reflected in zk
- val actionStatus =
client.getData.forPath(s"/${jobId}/task-0000000001-error")
+ val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
new String(actionStatus) should be("queued")
// and returned by getNextActionData
diff --git
a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
index 3a347c1f..13685f91 100755
---
a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
+++
b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
@@ -55,9 +55,9 @@ class JobParserTests extends FlatSpec with Matchers {
job.registeredActions.size should be(3)
- job.registeredActions.get("0000000000").get.data.name should be("start")
- job.registeredActions.get("0000000001").get.data.name should be("step2")
- job.registeredActions.get("0000000001-error").get.data.name should
be("error-action")
+ job.registeredActions.get("0000000000").get.data.getName should be("start")
+ job.registeredActions.get("0000000001").get.data.getName should be("step2")
+ job.registeredActions.get("0000000001-error").get.data.getName should
be("error-action")
}
diff --git
a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
index eb08942f..64887ab6 100755
---
a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
+++
b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
@@ -68,7 +68,7 @@ class JobRestoreTests extends FlatSpec with Matchers with
BeforeAndAfterEach {
JobLoader.restoreJobState(manager, jobId, client)
- queue.peek.name should be("start")
+ queue.peek.getName should be("start")
}
"a restored job" should "have all started actions in the executionQueue" in {
@@ -78,6 +78,6 @@ class JobRestoreTests extends FlatSpec with Matchers with
BeforeAndAfterEach {
JobLoader.restoreJobState(manager, jobId, client)
- queue.peek.name should be("start")
+ queue.peek.getName should be("start")
}
}
\ No newline at end of file
diff --git
a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
index 0e321f01..7eba1daf 100644
--- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
@@ -26,7 +26,7 @@ class HttpServerTests extends FlatSpec with Matchers {
// this is an ugly hack, getClass.getResource("/").getPath should have
worked but
// stopped working when we moved to gradle :(
- val resources = new
File(getClass.getResource("/simple-maki.yml").getPath).getParent
+ val resources: String = new
File(getClass.getResource("/simple-maki.yml").getPath).getParent
// "Jetty Web server" should "start HTTP server, serve content and stop
successfully" in {
//
diff --git a/sdk/build.gradle b/sdk/build.gradle
index 581ea081..c5378b88 100644
--- a/sdk/build.gradle
+++ b/sdk/build.gradle
@@ -15,7 +15,7 @@
* limitations under the License.
*/
apply plugin: 'java'
-
+apply plugin: "kotlin"
repositories {
mavenCentral()
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services