[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-151264212 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-151264217 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44369/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-151263991 **[Test build #44369 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44369/consoleFull)** for PR 8744 at commit [`bb01453`](https://github.com/apache/spark/commit/bb014535c4987f73398f48973efe380f94ab4ece). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-151220033 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-151220009 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-151220647 **[Test build #44369 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44369/consoleFull)** for PR 8744 at commit [`bb01453`](https://github.com/apache/spark/commit/bb014535c4987f73398f48973efe380f94ab4ece). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42784907 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala --- @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.history.yarn + +import java.io.IOException +import java.net.{URI, URL} +import java.text.DateFormat +import java.util.concurrent.atomic.AtomicLong +import java.util.{ArrayList => JArrayList, Collection => JCollection, Date, HashMap => JHashMap, Map => JMap} +import java.{lang, util} + +import scala.collection.JavaConversions._ +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.service.Service +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError +import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.json4s.JsonAST.JObject +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.deploy.history.yarn.YarnHistoryService._ +import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent} +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * Utility methods for timeline classes. + */ +private[spark] object YarnTimelineUtils extends Logging { + + /** + * What attempt ID to use as the attempt ID field (not the entity ID) when + * there is no attempt info + */ + val SINGLE_ATTEMPT = "1" + + + /** + * Exception text when there is no event info data to unmarshall + */ + val E_NO_EVENTINFO = "No 'eventinfo' entry" + + /** + * Exception text when there is event info entry in the timeline event, but it is empty + */ + + val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry" + + /** + * counter incremented on every spark event to timeline event creation, + * so guaranteeing uniqueness of event IDs across a single application attempt + * (which is implicitly, one per JVM) + */ + val uid = new AtomicLong(System.currentTimeMillis()) + + /** + * Converts a Java object to its equivalent json4s representation. + */ + def toJValue(obj: Object): JValue = { +obj match { + case str: String => JString(str) + case dbl: java.lang.Double => JDouble(dbl) + case dec: java.math.BigDecimal => JDecimal(dec) + case int: java.lang.Integer => JInt(BigInt(int)) + case long: java.lang.Long => JInt(BigInt(long)) + case bool: java.lang.Boolean => JBool(bool) + case map: JMap[_, _] => +val jmap = map.asInstanceOf[JMap[String, Object]] +JObject(jmap.entrySet().map { e => (e.getKey() -> toJValue(e.getValue())) }.toList) + case array: JCollection[_] => +JArray(array.asInstanceOf[JCollection[Object]].map(o => toJValue(o)).toList) + case null => JNothing +} + } + + /** + * Converts a JValue into its Java equivalent. + */ + def toJavaObject(v: JValue): Object = { +v match { + case JNothing => null + case JNull => null + case JString(s) => s + case JDouble(num) => java.lang.Double.valueOf(num) + case JDecimal(num) => num.bigDecimal + case JInt(num) => java.lang.Long.valueOf(num.longValue) + case JBool(value) =>
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42667047 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42668737 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala --- @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.history.yarn + +import java.io.IOException +import java.net.{URI, URL} +import java.text.DateFormat +import java.util.concurrent.atomic.AtomicLong +import java.util.{ArrayList => JArrayList, Collection => JCollection, Date, HashMap => JHashMap, Map => JMap} +import java.{lang, util} + +import scala.collection.JavaConversions._ +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.service.Service +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError +import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.json4s.JsonAST.JObject +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.deploy.history.yarn.YarnHistoryService._ +import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent} +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * Utility methods for timeline classes. + */ +private[spark] object YarnTimelineUtils extends Logging { + + /** + * What attempt ID to use as the attempt ID field (not the entity ID) when + * there is no attempt info + */ + val SINGLE_ATTEMPT = "1" + + + /** + * Exception text when there is no event info data to unmarshall + */ + val E_NO_EVENTINFO = "No 'eventinfo' entry" + + /** + * Exception text when there is event info entry in the timeline event, but it is empty + */ + + val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry" + + /** + * counter incremented on every spark event to timeline event creation, + * so guaranteeing uniqueness of event IDs across a single application attempt + * (which is implicitly, one per JVM) + */ + val uid = new AtomicLong(System.currentTimeMillis()) + + /** + * Converts a Java object to its equivalent json4s representation. + */ + def toJValue(obj: Object): JValue = { +obj match { + case str: String => JString(str) + case dbl: java.lang.Double => JDouble(dbl) + case dec: java.math.BigDecimal => JDecimal(dec) + case int: java.lang.Integer => JInt(BigInt(int)) + case long: java.lang.Long => JInt(BigInt(long)) + case bool: java.lang.Boolean => JBool(bool) + case map: JMap[_, _] => +val jmap = map.asInstanceOf[JMap[String, Object]] +JObject(jmap.entrySet().map { e => (e.getKey() -> toJValue(e.getValue())) }.toList) + case array: JCollection[_] => +JArray(array.asInstanceOf[JCollection[Object]].map(o => toJValue(o)).toList) + case null => JNothing +} + } + + /** + * Converts a JValue into its Java equivalent. + */ + def toJavaObject(v: JValue): Object = { +v match { + case JNothing => null + case JNull => null + case JString(s) => s + case JDouble(num) => java.lang.Double.valueOf(num) + case JDecimal(num) => num.bigDecimal + case JInt(num) => java.lang.Long.valueOf(num.longValue) + case JBool(value) =>
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42669119 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala --- @@ -0,0 +1,765 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.io.IOException +import java.net.{InetSocketAddress, NoRouteToHostException, URI, URL} +import java.text.DateFormat +import java.util.concurrent.atomic.AtomicLong +import java.util.{ArrayList => JArrayList, Collection => JCollection, Date, HashMap => JHashMap, Map => JMap} +import java.{lang, util} + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError +import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.json4s.JsonAST.JObject +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.deploy.history.yarn.YarnHistoryService._ +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted} +import org.apache.spark.util.{JsonProtocol, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Utility methods for timeline classes. + */ +private[spark] object YarnTimelineUtils extends Logging { + + /** + * What attempt ID to use as the attempt ID field (not the entity ID) when + * there is no attempt info. + */ + val SINGLE_ATTEMPT = "1" + + /** + * Exception text when there is no event info data to unmarshall. + */ + val E_NO_EVENTINFO = "No 'eventinfo' entry" + + /** + * Exception text when there is event info entry in the timeline event, but it is empty. + */ + + val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry" + + /** + * counter incremented on every spark event to timeline event creation, + * so guaranteeing uniqueness of event IDs across a single application attempt + * (which is implicitly, one per JVM). + */ + val uid = new AtomicLong(System.currentTimeMillis()) + + /** + * Converts a Java object to its equivalent json4s representation. + */ + def toJValue(obj: Object): JValue = { +obj match { + case str: String => JString(str) + case dbl: java.lang.Double => JDouble(dbl) + case dec: java.math.BigDecimal => JDecimal(dec) + case int: java.lang.Integer => JInt(BigInt(int)) + case long: java.lang.Long => JInt(BigInt(long)) + case bool: java.lang.Boolean => JBool(bool) + case map: JMap[_, _] => +val jmap = map.asInstanceOf[JMap[String, Object]] +JObject(jmap.entrySet().asScala.map { e => e.getKey -> toJValue(e.getValue) }.toList) + case array: JCollection[_] => +JArray(array.asInstanceOf[JCollection[Object]].asScala.map(o => toJValue(o)).toList) + case null => JNothing +} + } + + /** + * Converts a JValue into its Java equivalent. + */ + def toJavaObject(v: JValue): Object = { +v match { + case JNothing => null + case JNull => null + case JString(s) => s + case JDouble(num) => java.lang.Double.valueOf(num) + case JDecimal(num) => num.bigDecimal + case JInt(num) => java.lang.Long.valueOf(num.longValue()) + case JBool(value) =>
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42674559 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala --- @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.history.yarn + +import java.io.IOException +import java.net.{URI, URL} +import java.text.DateFormat +import java.util.concurrent.atomic.AtomicLong +import java.util.{ArrayList => JArrayList, Collection => JCollection, Date, HashMap => JHashMap, Map => JMap} +import java.{lang, util} + +import scala.collection.JavaConversions._ +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.service.Service +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError +import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.json4s.JsonAST.JObject +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkContext, Logging} +import org.apache.spark.deploy.history.yarn.YarnHistoryService._ +import org.apache.spark.scheduler.{SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent} +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * Utility methods for timeline classes. + */ +private[spark] object YarnTimelineUtils extends Logging { + + /** + * What attempt ID to use as the attempt ID field (not the entity ID) when + * there is no attempt info + */ + val SINGLE_ATTEMPT = "1" + + + /** + * Exception text when there is no event info data to unmarshall + */ + val E_NO_EVENTINFO = "No 'eventinfo' entry" + + /** + * Exception text when there is event info entry in the timeline event, but it is empty + */ + + val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry" + + /** + * counter incremented on every spark event to timeline event creation, + * so guaranteeing uniqueness of event IDs across a single application attempt + * (which is implicitly, one per JVM) + */ + val uid = new AtomicLong(System.currentTimeMillis()) + + /** + * Converts a Java object to its equivalent json4s representation. + */ + def toJValue(obj: Object): JValue = { +obj match { + case str: String => JString(str) + case dbl: java.lang.Double => JDouble(dbl) + case dec: java.math.BigDecimal => JDecimal(dec) + case int: java.lang.Integer => JInt(BigInt(int)) + case long: java.lang.Long => JInt(BigInt(long)) + case bool: java.lang.Boolean => JBool(bool) + case map: JMap[_, _] => +val jmap = map.asInstanceOf[JMap[String, Object]] +JObject(jmap.entrySet().map { e => (e.getKey() -> toJValue(e.getValue())) }.toList) + case array: JCollection[_] => +JArray(array.asInstanceOf[JCollection[Object]].map(o => toJValue(o)).toList) + case null => JNothing +} + } + + /** + * Converts a JValue into its Java equivalent. + */ + def toJavaObject(v: JValue): Object = { +v match { + case JNothing => null + case JNull => null + case JString(s) => s + case JDouble(num) => java.lang.Double.valueOf(num) + case JDecimal(num) => num.bigDecimal + case JInt(num) => java.lang.Long.valueOf(num.longValue) + case JBool(value) =>
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149643277 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43986/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42537310 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149643275 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42538300 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42539302 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149642585 **[Test build #43986 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43986/consoleFull)** for PR 8744 at commit [`0dc3f13`](https://github.com/apache/spark/commit/0dc3f13339c2223dead09e4bb59d538593438c0e). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * ` case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse`\n * `trait SchedulerExtensionService `\n * `case class SchedulerExtensionServiceBinding(`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42531200 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42540050 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42539027 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149608844 **[Test build #43975 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43975/consoleFull)** for PR 8744 at commit [`20216b4`](https://github.com/apache/spark/commit/20216b435b25a4757775cacec1b01882b6eacb31). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r42525776 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** --- End diff -- will do; will go to upper case word & review all javadocs to verify --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149610234 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43975/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149610233 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149597210 **[Test build #43986 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43986/consoleFull)** for PR 8744 at commit [`0dc3f13`](https://github.com/apache/spark/commit/0dc3f13339c2223dead09e4bb59d538593438c0e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149593229 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149593131 **[Test build #43978 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43978/consoleFull)** for PR 8744 at commit [`3b7d31b`](https://github.com/apache/spark/commit/3b7d31b2a0f913140ced29285a8f26c46c24928f). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_:\n * `trait SchedulerExtensionService `\n * `case class SchedulerExtensionServiceBinding(`\n --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149593230 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43978/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149595578 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149595658 Test failed as there was a class `StubApplicationReport` which had been deleted but the deletion not pushed up. The YARN `ApplicationReport` class has grown on every hadoop release, and you can't have a stub/mock one that builds across Hadoop 2.x versions. The latest history-server tests have cut the class and are instead constructing the `ApplicationReportPBImpl` direct. And I've filed [YARN-4279](https://issues.apache.org/jira/browse/YARN-4279) to at least annotate the simple, stable attempt and application ID class constructors public. If mapreduce uses them, any YARN app should be able to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149595521 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149563274 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149563254 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149564509 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149564531 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149567517 **[Test build #43978 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43978/consoleFull)** for PR 8744 at commit [`3b7d31b`](https://github.com/apache/spark/commit/3b7d31b2a0f913140ced29285a8f26c46c24928f). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-149567694 **[Test build #43975 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43975/consoleFull)** for PR 8744 at commit [`20216b4`](https://github.com/apache/spark/commit/20216b435b25a4757775cacec1b01882b6eacb31). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r41951480 --- Diff: yarn/pom.xml --- @@ -164,6 +164,92 @@ + + --- End diff -- Can this say "The YARN application server..." there is already a different component in Spark called history server. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147746836 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147746916 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147749231 I've pushed out a rebase of this against master. 1. The asynchronous publishing of events with retries and, eventually dropped events, appears to be stable, at least during unit and integration tests. 1. scalastyle is set up also scan the yarn/history source trees (the code is still there), and is happy. 1. I've renamed Service, SchedulerExtensionService, as I proposed earlier. That helps distinguish it from 'YARN services', which are entirely independent. 1. I've also moved the test of extension service instantiation into the `yarn/src/test` source tree as `org.apache.spark.scheduler.cluster.ExtensionServiceIntegrationSuite`. With that isolated test, I can submit the extension as a self-contained patch for independent review. Note how the extension service takes a case class {{SchedulerExtensionServiceBinding}} as its binding, rathe than a series of parameters. That's so that in future, if new binding attributes were added (currently: context, yarn app ID and yarn attempt ID), it would remain backwards compatible with existing scheduler extensions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147749964 [Test build #43644 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43644/consoleFull) for PR 8744 at commit [`aac3a1d`](https://github.com/apache/spark/commit/aac3a1d5ab4557e02261155452192ac37b6a4c0a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147783828 @steveloughran anything that can break these patches into smaller PRs for review is welcome. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147784582 [Test build #43644 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43644/console) for PR 8744 at commit [`aac3a1d`](https://github.com/apache/spark/commit/aac3a1d5ab4557e02261155452192ac37b6a4c0a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `trait SchedulerExtensionService ` * `case class SchedulerExtensionServiceBinding(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147784673 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-147784706 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43644/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-145818164 @tgravescs: ATS v1. What we got from the ATS team was a commitment to not break that API at some ws/v1/ endpoint they'll continue to offer. However, V2 is designed to address other things (like availability), as well as scale. 1.x clients won't get that. This implementation is designed to keep load on ATS down (you can tell it not to do any refresh in the background), but an 8GB event, while it should be handled by leveldb, is a pretty serious json ser/deser load at both ends. I'd like to move to background playback where the getAppUI() returns the UI, but the UI is built up asynchronously, as you'd get from a running app. The filesystem history provider could do the same thing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user tgravescs commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-144080509 Sorry I haven't had time to follow all of these changes, which version of ATS are you testing this on and at what scale? I'm assuming ATS v1 since v2 still in devel. We have been using ATS with tez and it can't handle nearly the load we need it to. With Spark I see cases where that just gets worse if you have longer running iterative things. I have seen history files in the 8GB size on spark before. I thought the ATS v2 was going to try to keep api compatibility but don't think its guaranteed. Obviously it doesn't hurt to get framework in place and I assume it will work for smaller clusters/# jobs. Have you looked at ats v2 at all to know if the changes to go to it would be fairly minimal? Mostly just curious where this is at as I'm seeing a bunch of small issues with the Spark history server. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601188 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40605842 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40606009 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40609120 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40609137 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40600846 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import org.apache.spark.scheduler._ +import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.{Logging, SparkContext, SparkFirehoseListener} + +/** + * Spark listener which queues up all received events to the [[YarnHistoryService]] passed + * as a constructor. There's no attempt to filter event types at this point. + * + * @param sc context + * @param service service to forward events to + */ +private[spark] class YarnEventListener(sc: SparkContext, service: YarnHistoryService) + extends SparkFirehoseListener with Logging { + + private val clock: Clock = new SystemClock() --- End diff -- Not used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40600885 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} --- End diff -- nit: ordering --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40606368 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40608807 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-143882558 A lot of the logic in `YarnHistoryService` changed since the last review, so I might have missed some stuff. I didn't have the time to go through the rest yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601372 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40608080 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40608423 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40600962 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { --- End diff -- nit: add empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40606942 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40607349 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40609349 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601106 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** --- End diff -- nit: please be consistent re: single line vs. multi-line docs, and starting with capital or lower case letters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601587 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40606120 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40606029 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40606779 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-143024215 [Test build #42976 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42976/consoleFull) for PR 8744 at commit [`231301a`](https://github.com/apache/spark/commit/231301a943f449db1ccf46573f2e98a47034c797). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-143021446 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-143021415 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-143057982 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-143057931 [Test build #42976 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42976/console) for PR 8744 at commit [`231301a`](https://github.com/apache/spark/commit/231301a943f449db1ccf46573f2e98a47034c797). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `trait YarnExtensionService ` * `case class YarnExtensionServiceBinding(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-143057985 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42976/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-142672107 [Test build #42912 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42912/consoleFull) for PR 8744 at commit [`103047c`](https://github.com/apache/spark/commit/103047cb989669ae7b666b4ba52d728fed5b1b88). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-142670534 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-142670505 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-142711558 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42912/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-142711556 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8744#issuecomment-142711417 [Test build #42912 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42912/console) for PR 8744 at commit [`103047c`](https://github.com/apache/spark/commit/103047cb989669ae7b666b4ba52d728fed5b1b88). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `trait YarnExtensionService ` * `case class YarnExtensionServiceBinding(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39633153 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import org.apache.spark.scheduler._ +import org.apache.spark.util.{SystemClock, Clock} +import org.apache.spark.{Logging, SparkContext} + +/** + * Spark listener which queues up all received events to the [[YarnHistoryService]] passed + * as a constructor. There's no attempt to filter event types at this point. + * @param sc context + * @param service service to forward events to + */ +private[spark] class YarnEventListener(sc: SparkContext, service: YarnHistoryService) + extends SparkListener with Logging { --- End diff -- Why yes it would —thank you for pointing that class out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39635426 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39635411 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39638971 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39634546 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") --- End diff -- 1. It makes it easier to aggregate up things, and it has a robust model for thread-safe subclassing through a lifecycle. Most of YARN is a composition of these; in fact the spark YARN client and AM could adopt it for better management of `AMRMClient`, `YarnClient`, `YarnExtensionServices` & the like 2. The hive-thriftserver code *almost* uses it. More specifically, it used a cut-and-paste of the Hadoop 2.04-alpha era and then abuses java reflection to bypass bits of the direct parent. Once Hive moves to Hadoop 2.2+ only then it can be moved back into the YARN model —though once you modify Hive 1.2.x to provide override points the spark-hivethriftserver module doesn't need the `ReflectedCompositeService` hack any more. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39637541 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39635159 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} --- End diff -- sorry, I'd thought Idea was getting it right. I'll review all of them and see if that 3rd party organiser helps. Keeping the imports organised is critical given how brittle it is across patches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39638560 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39638651 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39638893 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39637695 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39638905 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39643031 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39643240 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39643548 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39644786 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39645121 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r39645278 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} + +import scala.collection.mutable.LinkedList +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.service.{AbstractService, Service} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelinePutResponse} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.YarnExtensionService +import org.apache.spark.util.{SystemClock, Clock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * Implements a Hadoop service with the init/start logic replaced by that + * of the YarnService. + * + * As [[AbstractService]] implements `close()`, routing + * to its `stop` method, calling `close()` is sufficient + * to stop the service instance. + * + * However, when registered to receive spark events, the service will continue to + * receive them until the spark context is stopped. Events received when this service + * is in a `STOPPED` state will be discarded. + */ +private[spark] class YarnHistoryService extends AbstractService("History Service") + with YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Applicaton name */ + private var appName: String = _ + + /** Application ID from the spark start event */ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName: String = Utils.getCurrentUserName + + /** + * Clock for recording time + */ + private val clock: Clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** number of events to batch up before posting*/ + private var batchSize: Int = DEFAULT_BATCH_SIZE + + /** queue of actions*/ + private val actionQueue = new LinkedBlockingQueue[QueuedAction] + + /** cache layer to handle timeline client failure.*/ + private var entityList = new LinkedList[TimelineEntity] + + /** current entity; wil be created on demand. */ + private var curEntity: Option[TimelineEntity] = None + + /** Has a start event been processed? */ + private val appStartEventProcessed = new AtomicBoolean(false) + + /* has the application event event been processed